Browse Source

Support multiple Kafka servers

pull/844/head
sparkingdark 3 years ago
committed by Miguel Grinberg
parent
commit
4ee3649514
No known key found for this signature in database GPG Key ID: 36848B262DF5F06C
  1. 12
      src/socketio/kafka_manager.py

12
src/socketio/kafka_manager.py

@ -24,7 +24,9 @@ class KafkaManager(PubSubManager): # pragma: no cover
server = socketio.Server(client_manager=socketio.KafkaManager(url))
:param url: The connection URL for the Kafka server. For a default Kafka
store running on the same host, use ``kafka://``.
store running on the same host, use ``kafka://``. For a highly
available deployment of Kafka, pass a list with all the
connection URLs available in your cluster.
:param channel: The channel name (topic) on which the server sends and
receives notifications. Must be the same in all the
servers.
@ -44,10 +46,12 @@ class KafkaManager(PubSubManager): # pragma: no cover
super(KafkaManager, self).__init__(channel=channel,
write_only=write_only)
self.kafka_url = url[8:] if url != 'kafka://' else 'localhost:9092'
self.producer = kafka.KafkaProducer(bootstrap_servers=self.kafka_url)
urls = [url] if isinstance(url, str) else url
self.kafka_urls = [url[8:] if url != 'kafka://' else 'localhost:9092'
for url in urls]
self.producer = kafka.KafkaProducer(bootstrap_servers=self.kafka_urls)
self.consumer = kafka.KafkaConsumer(self.channel,
bootstrap_servers=self.kafka_url)
bootstrap_servers=self.kafka_urls)
def _publish(self, data):
self.producer.send(self.channel, value=pickle.dumps(data))

Loading…
Cancel
Save