diff --git a/socketio/kafka_manager.py b/socketio/kafka_manager.py index d32cbda..00a2e7f 100644 --- a/socketio/kafka_manager.py +++ b/socketio/kafka_manager.py @@ -25,8 +25,9 @@ class KafkaManager(PubSubManager): # pragma: no cover :param url: The connection URL for the Kafka server. For a default Kafka store running on the same host, use ``kafka://``. - :param channel: The channel name (topic) on which the server sends and receives - notifications. Must be the same in all the servers. + :param channel: The channel name (topic) on which the server sends and + receives notifications. Must be the same in all the + servers. :param write_only: If set ot ``True``, only initialize to emit events. The default of ``False`` initializes the class for emitting and receiving. @@ -48,17 +49,14 @@ class KafkaManager(PubSubManager): # pragma: no cover self.consumer = kafka.KafkaConsumer(self.channel, bootstrap_servers=self.kafka_url) - def _publish(self, data): self.producer.send(self.channel, value=pickle.dumps(data)) self.producer.flush() - def _kafka_listen(self): for message in self.consumer: yield message - def _listen(self): for message in self._kafka_listen(): if message.topic == self.channel: