|
|
@ -22,7 +22,7 @@ class KafkaManager(PubSubManager): # pragma: no cover |
|
|
|
To use a Kafka backend, initialize the :class:`Server` instance as |
|
|
|
follows:: |
|
|
|
|
|
|
|
url = 'kafka://hostname:port/0' |
|
|
|
url = 'kafka://hostname:port' |
|
|
|
server = socketio.Server(client_manager=socketio.KafkaManager(url)) |
|
|
|
|
|
|
|
:param url: The connection URL for the Kafka server. For a default Kafka |
|
|
@ -35,31 +35,20 @@ class KafkaManager(PubSubManager): # pragma: no cover |
|
|
|
""" |
|
|
|
name = 'kafka' |
|
|
|
|
|
|
|
def __init__(self, url='kafka://localhost:9092/0', channel='socketio', |
|
|
|
def __init__(self, url='kafka://localhost:9092', channel='socketio', |
|
|
|
write_only=False): |
|
|
|
if kafka is None: |
|
|
|
raise RuntimeError('kafka-python package is not installed ' |
|
|
|
'(Run "pip install kafka-python" in your ' |
|
|
|
'virtualenv).') |
|
|
|
self.kafka_url = url |
|
|
|
self.producer = kafka.KafkaProducer(bootstrap_servers='localhost:9092') |
|
|
|
|
|
|
|
super(KafkaManager, self).__init__(channel=channel, |
|
|
|
write_only=write_only) |
|
|
|
|
|
|
|
def initialize(self): |
|
|
|
super(KafkaManager, self).initialize() |
|
|
|
monkey_patched = True |
|
|
|
if self.server.async_mode == 'eventlet': |
|
|
|
from eventlet.patcher import is_monkey_patched |
|
|
|
monkey_patched = is_monkey_patched('socket') |
|
|
|
elif 'gevent' in self.server.async_mode: |
|
|
|
from gevent.monkey import is_module_patched |
|
|
|
monkey_patched = is_module_patched('socket') |
|
|
|
if not monkey_patched: |
|
|
|
raise RuntimeError( |
|
|
|
'Kafka requires a monkey patched socket library to work ' |
|
|
|
'with ' + self.server.async_mode) |
|
|
|
self.kafka_url = url[8:] if url != 'kafka://' else 'localhost:9092' |
|
|
|
self.producer = kafka.KafkaProducer(bootstrap_servers=self.kafka_url) |
|
|
|
self.consumer = kafka.KafkaConsumer(self.channel, |
|
|
|
bootstrap_servers=self.kafka_url) |
|
|
|
|
|
|
|
|
|
|
|
def _publish(self, data): |
|
|
@ -68,7 +57,6 @@ class KafkaManager(PubSubManager): # pragma: no cover |
|
|
|
|
|
|
|
|
|
|
|
def _kafka_listen(self): |
|
|
|
self.consumer = kafka.KafkaConsumer(self.channel, bootstrap_servers='localhost:9092') |
|
|
|
for message in self.consumer: |
|
|
|
yield message |
|
|
|
|
|
|
|