From 610c229fd93eca7c8f3433ea8bdc705bd62a84cc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vincent=20M=C3=A9zino?= Date: Sun, 29 Apr 2018 00:33:29 +0200 Subject: [PATCH] Write KafkaManager --- socketio/kafka_manager.py | 101 +++++++++++++------------------------- 1 file changed, 35 insertions(+), 66 deletions(-) diff --git a/socketio/kafka_manager.py b/socketio/kafka_manager.py index c47c23b..1ec896e 100644 --- a/socketio/kafka_manager.py +++ b/socketio/kafka_manager.py @@ -1,11 +1,12 @@ import logging import pickle import time +import json try: - import redis + import kafka except ImportError: - redis = None + kafka = None from .pubsub_manager import PubSubManager @@ -13,43 +14,41 @@ logger = logging.getLogger('socketio') class KafkaManager(PubSubManager): # pragma: no cover - """Redis based client manager. + """Kafka based client manager. - This class implements a Redis backend for event sharing across multiple - processes. Only kept here as one more example of how to build a custom - backend, since the kombu backend is perfectly adequate to support a Redis - message queue. + This class implements a Kafka backend for event sharing across multiple + processes. - To use a Redis backend, initialize the :class:`Server` instance as + To use a Kafka backend, initialize the :class:`Server` instance as follows:: - url = 'redis://hostname:port/0' - server = socketio.Server(client_manager=socketio.RedisManager(url)) + url = 'kafka://hostname:port/0' + server = socketio.Server(client_manager=socketio.KafkaManager(url)) - :param url: The connection URL for the Redis server. For a default Redis - store running on the same host, use ``redis://``. - :param channel: The channel name on which the server sends and receives + :param url: The connection URL for the Kafka server. For a default Kafka + store running on the same host, use ``kafka://``. + :param channel: The channel name (topic) on which the server sends and receives notifications. Must be the same in all the servers. :param write_only: If set ot ``True``, only initialize to emit events. The default of ``False`` initializes the class for emitting and receiving. """ - name = 'redis' + name = 'kafka' - def __init__(self, url='redis://localhost:6379/0', channel='socketio', + def __init__(self, url='kafka://localhost:9092/0', channel='socketio', write_only=False): - if redis is None: - raise RuntimeError('Redis package is not installed ' - '(Run "pip install redis" in your ' + if kafka is None: + raise RuntimeError('kafka-python package is not installed ' + '(Run "pip install kafka-python" in your ' 'virtualenv).') - self.redis_url = url - self._redis_connect() - super(RedisManager, self).__init__(channel=channel, + self.kafka_url = url + self.producer = kafka.KafkaProducer(bootstrap_servers='localhost:9092') + + super(KafkaManager, self).__init__(channel=channel, write_only=write_only) def initialize(self): - super(RedisManager, self).initialize() - + super(KafkaManager, self).initialize() monkey_patched = True if self.server.async_mode == 'eventlet': from eventlet.patcher import is_monkey_patched @@ -59,52 +58,22 @@ class KafkaManager(PubSubManager): # pragma: no cover monkey_patched = is_module_patched('socket') if not monkey_patched: raise RuntimeError( - 'Redis requires a monkey patched socket library to work ' + 'Kafka requires a monkey patched socket library to work ' 'with ' + self.server.async_mode) - def _redis_connect(self): - self.redis = redis.Redis.from_url(self.redis_url) - self.pubsub = self.redis.pubsub() def _publish(self, data): - retry = True - while True: - try: - if not retry: - self._redis_connect() - return self.redis.publish(self.channel, pickle.dumps(data)) - except redis.exceptions.ConnectionError: - if retry: - logger.error('Cannot publish to redis... retrying') - retry = False - else: - logger.error('Cannot publish to redis... giving up') - break - - def _redis_listen_with_retries(self): - retry_sleep = 1 - connect = False - while True: - try: - if connect: - self._redis_connect() - self.pubsub.subscribe(self.channel) - for message in self.pubsub.listen(): - yield message - except redis.exceptions.ConnectionError: - logger.error('Cannot receive from redis... ' - 'retrying in {} secs'.format(retry_sleep)) - connect = True - time.sleep(retry_sleep) - retry_sleep *= 2 - if retry_sleep > 60: - retry_sleep = 60 + self.producer.send(self.channel, value=pickle.dumps(data)) + self.producer.flush() + + + def _kafka_listen(self): + self.consumer = kafka.KafkaConsumer(self.channel, bootstrap_servers='localhost:9092') + for message in self.consumer: + yield message + def _listen(self): - channel = self.channel.encode('utf-8') - self.pubsub.subscribe(self.channel) - for message in self._redis_listen_with_retries(): - if message['channel'] == channel and \ - message['type'] == 'message' and 'data' in message: - yield message['data'] - self.pubsub.unsubscribe(self.channel) + for message in self._kafka_listen(): + if message.topic == self.channel: + yield pickle.loads(message.value)