From dacdc1a13a794d073b220dfa3440e5556ace3e3e Mon Sep 17 00:00:00 2001 From: sparkingdark Date: Wed, 22 Dec 2021 22:35:04 +0530 Subject: [PATCH] added url_list extra parameter for multiple kafka server usage --- src/socketio/kafka_manager.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/src/socketio/kafka_manager.py b/src/socketio/kafka_manager.py index b5eb636..ccc06c4 100644 --- a/src/socketio/kafka_manager.py +++ b/src/socketio/kafka_manager.py @@ -1,5 +1,6 @@ import logging import pickle +from typing import List try: import kafka @@ -34,7 +35,7 @@ class KafkaManager(PubSubManager): # pragma: no cover """ name = 'kafka' - def __init__(self, url='kafka://localhost:9092', channel='socketio', + def __init__(self, url='kafka://localhost:9092', url_list=[], channel='socketio', write_only=False): if kafka is None: raise RuntimeError('kafka-python package is not installed ' @@ -45,9 +46,15 @@ class KafkaManager(PubSubManager): # pragma: no cover write_only=write_only) self.kafka_url = url[8:] if url != 'kafka://' else 'localhost:9092' - self.producer = kafka.KafkaProducer(bootstrap_servers=self.kafka_url) - self.consumer = kafka.KafkaConsumer(self.channel, - bootstrap_servers=self.kafka_url) + if url_list != []: + self.kafka_url_list = url_list[0] + self.producer = kafka.KafkaProducer(bootstrap_servers=self.kafka_url_list) + self.consumer = kafka.KafkaConsumer(self.channel, + bootstrap_servers=self.kafka_url_list) + else: + self.producer = kafka.KafkaProducer(bootstrap_servers=self.kafka_url) + self.consumer = kafka.KafkaConsumer(self.channel, + bootstrap_servers=self.kafka_url) def _publish(self, data): self.producer.send(self.channel, value=pickle.dumps(data))