diff --git a/src/socketio/kafka_manager.py b/src/socketio/kafka_manager.py index ad026f3..30ec3ca 100644 --- a/src/socketio/kafka_manager.py +++ b/src/socketio/kafka_manager.py @@ -1,6 +1,6 @@ import logging import pickle -from typing import List + try: import kafka @@ -9,7 +9,7 @@ except ImportError: from .pubsub_manager import PubSubManager -logger = logging.getLogger('socketio') +logger = logging.getLogger("socketio") class KafkaManager(PubSubManager): # pragma: no cover @@ -26,6 +26,8 @@ class KafkaManager(PubSubManager): # pragma: no cover :param url: The connection URL for the Kafka server. For a default Kafka store running on the same host, use ``kafka://``. + From now on you can pass a list of URLs to use + different Kafka servers. :param channel: The channel name (topic) on which the server sends and receives notifications. Must be the same in all the servers. @@ -33,23 +35,29 @@ class KafkaManager(PubSubManager): # pragma: no cover default of ``False`` initializes the class for emitting and receiving. """ - name = 'kafka' - def __init__(self, url='kafka://localhost:9092',channel='socketio', - write_only=False): + name = "kafka" + + def __init__( + self, url="kafka://localhost:9092", channel="socketio", write_only=False + ): if kafka is None: - raise RuntimeError('kafka-python package is not installed ' - '(Run "pip install kafka-python" in your ' - 'virtualenv).') + raise RuntimeError( + "kafka-python package is not installed " + '(Run "pip install kafka-python" in your ' + "virtualenv)." + ) + + super(KafkaManager, self).__init__(channel=channel, write_only=write_only) - super(KafkaManager, self).__init__(channel=channel, - write_only=write_only) - urls = [url] if isinstance(url, str) else url - self.kafka_urls = [url[8:] if url != 'kafka://' else 'localhost:9092' for url in urls] + self.kafka_urls = [ + url[8:] if url != "kafka://" else "localhost:9092" for url in urls + ] self.producer = kafka.KafkaProducer(bootstrap_servers=self.kafka_urls) - self.consumer = kafka.KafkaConsumer(self.channel, - bootstrap_servers=self.kafka_urls) + self.consumer = kafka.KafkaConsumer( + self.channel, bootstrap_servers=self.kafka_urls + ) def _publish(self, data): self.producer.send(self.channel, value=pickle.dumps(data))