diff --git a/src/socketio/kafka_manager.py b/src/socketio/kafka_manager.py index b5eb636..739871a 100644 --- a/src/socketio/kafka_manager.py +++ b/src/socketio/kafka_manager.py @@ -24,7 +24,9 @@ class KafkaManager(PubSubManager): # pragma: no cover server = socketio.Server(client_manager=socketio.KafkaManager(url)) :param url: The connection URL for the Kafka server. For a default Kafka - store running on the same host, use ``kafka://``. + store running on the same host, use ``kafka://``. For a highly + available deployment of Kafka, pass a list with all the + connection URLs available in your cluster. :param channel: The channel name (topic) on which the server sends and receives notifications. Must be the same in all the servers. @@ -44,10 +46,12 @@ class KafkaManager(PubSubManager): # pragma: no cover super(KafkaManager, self).__init__(channel=channel, write_only=write_only) - self.kafka_url = url[8:] if url != 'kafka://' else 'localhost:9092' - self.producer = kafka.KafkaProducer(bootstrap_servers=self.kafka_url) + urls = [url] if isinstance(url, str) else url + self.kafka_urls = [url[8:] if url != 'kafka://' else 'localhost:9092' + for url in urls] + self.producer = kafka.KafkaProducer(bootstrap_servers=self.kafka_urls) self.consumer = kafka.KafkaConsumer(self.channel, - bootstrap_servers=self.kafka_url) + bootstrap_servers=self.kafka_urls) def _publish(self, data): self.producer.send(self.channel, value=pickle.dumps(data))