Browse Source

added url_list extra parameter for multiple kafka server usage

pull/840/head
sparkingdark 3 years ago
parent
commit
dacdc1a13a
  1. 15
      src/socketio/kafka_manager.py

15
src/socketio/kafka_manager.py

@ -1,5 +1,6 @@
import logging
import pickle
from typing import List
try:
import kafka
@ -34,7 +35,7 @@ class KafkaManager(PubSubManager): # pragma: no cover
"""
name = 'kafka'
def __init__(self, url='kafka://localhost:9092', channel='socketio',
def __init__(self, url='kafka://localhost:9092', url_list=[], channel='socketio',
write_only=False):
if kafka is None:
raise RuntimeError('kafka-python package is not installed '
@ -45,9 +46,15 @@ class KafkaManager(PubSubManager): # pragma: no cover
write_only=write_only)
self.kafka_url = url[8:] if url != 'kafka://' else 'localhost:9092'
self.producer = kafka.KafkaProducer(bootstrap_servers=self.kafka_url)
self.consumer = kafka.KafkaConsumer(self.channel,
bootstrap_servers=self.kafka_url)
if url_list != []:
self.kafka_url_list = url_list[0]
self.producer = kafka.KafkaProducer(bootstrap_servers=self.kafka_url_list)
self.consumer = kafka.KafkaConsumer(self.channel,
bootstrap_servers=self.kafka_url_list)
else:
self.producer = kafka.KafkaProducer(bootstrap_servers=self.kafka_url)
self.consumer = kafka.KafkaConsumer(self.channel,
bootstrap_servers=self.kafka_url)
def _publish(self, data):
self.producer.send(self.channel, value=pickle.dumps(data))

Loading…
Cancel
Save