diff --git a/socketio/kafka_manager.py b/socketio/kafka_manager.py index 1ec896e..7bb1bcb 100644 --- a/socketio/kafka_manager.py +++ b/socketio/kafka_manager.py @@ -22,7 +22,7 @@ class KafkaManager(PubSubManager): # pragma: no cover To use a Kafka backend, initialize the :class:`Server` instance as follows:: - url = 'kafka://hostname:port/0' + url = 'kafka://hostname:port' server = socketio.Server(client_manager=socketio.KafkaManager(url)) :param url: The connection URL for the Kafka server. For a default Kafka @@ -35,31 +35,20 @@ class KafkaManager(PubSubManager): # pragma: no cover """ name = 'kafka' - def __init__(self, url='kafka://localhost:9092/0', channel='socketio', + def __init__(self, url='kafka://localhost:9092', channel='socketio', write_only=False): if kafka is None: raise RuntimeError('kafka-python package is not installed ' '(Run "pip install kafka-python" in your ' 'virtualenv).') - self.kafka_url = url - self.producer = kafka.KafkaProducer(bootstrap_servers='localhost:9092') super(KafkaManager, self).__init__(channel=channel, write_only=write_only) - def initialize(self): - super(KafkaManager, self).initialize() - monkey_patched = True - if self.server.async_mode == 'eventlet': - from eventlet.patcher import is_monkey_patched - monkey_patched = is_monkey_patched('socket') - elif 'gevent' in self.server.async_mode: - from gevent.monkey import is_module_patched - monkey_patched = is_module_patched('socket') - if not monkey_patched: - raise RuntimeError( - 'Kafka requires a monkey patched socket library to work ' - 'with ' + self.server.async_mode) + self.kafka_url = url[8:] if url != 'kafka://' else 'localhost:9092' + self.producer = kafka.KafkaProducer(bootstrap_servers=self.kafka_url) + self.consumer = kafka.KafkaConsumer(self.channel, + bootstrap_servers=self.kafka_url) def _publish(self, data): @@ -68,7 +57,6 @@ class KafkaManager(PubSubManager): # pragma: no cover def _kafka_listen(self): - self.consumer = kafka.KafkaConsumer(self.channel, bootstrap_servers='localhost:9092') for message in self.consumer: yield message