From 725afcbc14cd854974db50b57023d94e2c7c6dfa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vincent=20M=C3=A9zino?= Date: Sat, 28 Apr 2018 16:47:30 +0200 Subject: [PATCH 1/9] Add a copy of redis manager and name it kafka manager --- socketio/kafka_manager.py | 110 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 110 insertions(+) create mode 100644 socketio/kafka_manager.py diff --git a/socketio/kafka_manager.py b/socketio/kafka_manager.py new file mode 100644 index 0000000..c47c23b --- /dev/null +++ b/socketio/kafka_manager.py @@ -0,0 +1,110 @@ +import logging +import pickle +import time + +try: + import redis +except ImportError: + redis = None + +from .pubsub_manager import PubSubManager + +logger = logging.getLogger('socketio') + + +class KafkaManager(PubSubManager): # pragma: no cover + """Redis based client manager. + + This class implements a Redis backend for event sharing across multiple + processes. Only kept here as one more example of how to build a custom + backend, since the kombu backend is perfectly adequate to support a Redis + message queue. + + To use a Redis backend, initialize the :class:`Server` instance as + follows:: + + url = 'redis://hostname:port/0' + server = socketio.Server(client_manager=socketio.RedisManager(url)) + + :param url: The connection URL for the Redis server. For a default Redis + store running on the same host, use ``redis://``. + :param channel: The channel name on which the server sends and receives + notifications. Must be the same in all the servers. + :param write_only: If set ot ``True``, only initialize to emit events. The + default of ``False`` initializes the class for emitting + and receiving. + """ + name = 'redis' + + def __init__(self, url='redis://localhost:6379/0', channel='socketio', + write_only=False): + if redis is None: + raise RuntimeError('Redis package is not installed ' + '(Run "pip install redis" in your ' + 'virtualenv).') + self.redis_url = url + self._redis_connect() + super(RedisManager, self).__init__(channel=channel, + write_only=write_only) + + def initialize(self): + super(RedisManager, self).initialize() + + monkey_patched = True + if self.server.async_mode == 'eventlet': + from eventlet.patcher import is_monkey_patched + monkey_patched = is_monkey_patched('socket') + elif 'gevent' in self.server.async_mode: + from gevent.monkey import is_module_patched + monkey_patched = is_module_patched('socket') + if not monkey_patched: + raise RuntimeError( + 'Redis requires a monkey patched socket library to work ' + 'with ' + self.server.async_mode) + + def _redis_connect(self): + self.redis = redis.Redis.from_url(self.redis_url) + self.pubsub = self.redis.pubsub() + + def _publish(self, data): + retry = True + while True: + try: + if not retry: + self._redis_connect() + return self.redis.publish(self.channel, pickle.dumps(data)) + except redis.exceptions.ConnectionError: + if retry: + logger.error('Cannot publish to redis... retrying') + retry = False + else: + logger.error('Cannot publish to redis... giving up') + break + + def _redis_listen_with_retries(self): + retry_sleep = 1 + connect = False + while True: + try: + if connect: + self._redis_connect() + self.pubsub.subscribe(self.channel) + for message in self.pubsub.listen(): + yield message + except redis.exceptions.ConnectionError: + logger.error('Cannot receive from redis... ' + 'retrying in {} secs'.format(retry_sleep)) + connect = True + time.sleep(retry_sleep) + retry_sleep *= 2 + if retry_sleep > 60: + retry_sleep = 60 + + def _listen(self): + channel = self.channel.encode('utf-8') + self.pubsub.subscribe(self.channel) + for message in self._redis_listen_with_retries(): + if message['channel'] == channel and \ + message['type'] == 'message' and 'data' in message: + yield message['data'] + self.pubsub.unsubscribe(self.channel) From 2568c80a5afefdbf55a1a04fa6fccd9124d09596 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vincent=20M=C3=A9zino?= Date: Sat, 28 Apr 2018 17:10:31 +0200 Subject: [PATCH 2/9] Add KafkaManager to __init__.py --- socketio/__init__.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/socketio/__init__.py b/socketio/__init__.py index 9f7f711..859b53b 100644 --- a/socketio/__init__.py +++ b/socketio/__init__.py @@ -5,6 +5,7 @@ from .base_manager import BaseManager from .pubsub_manager import PubSubManager from .kombu_manager import KombuManager from .redis_manager import RedisManager +from .kafka_manager import KafkaManager from .zmq_manager import ZmqManager from .server import Server from .namespace import Namespace @@ -22,8 +23,8 @@ else: # pragma: no cover __version__ = '1.9.0' __all__ = ['__version__', 'Middleware', 'Server', 'BaseManager', - 'PubSubManager', 'KombuManager', 'RedisManager', 'ZmqManager', - 'Namespace'] + 'PubSubManager', 'KombuManager', 'RedisManager', 'KafkaManager', + 'ZmqManager', 'Namespace'] if AsyncServer is not None: # pragma: no cover __all__ += ['AsyncServer', 'AsyncNamespace', 'AsyncManager', 'AsyncRedisManager'] From 610c229fd93eca7c8f3433ea8bdc705bd62a84cc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vincent=20M=C3=A9zino?= Date: Sun, 29 Apr 2018 00:33:29 +0200 Subject: [PATCH 3/9] Write KafkaManager --- socketio/kafka_manager.py | 101 +++++++++++++------------------------- 1 file changed, 35 insertions(+), 66 deletions(-) diff --git a/socketio/kafka_manager.py b/socketio/kafka_manager.py index c47c23b..1ec896e 100644 --- a/socketio/kafka_manager.py +++ b/socketio/kafka_manager.py @@ -1,11 +1,12 @@ import logging import pickle import time +import json try: - import redis + import kafka except ImportError: - redis = None + kafka = None from .pubsub_manager import PubSubManager @@ -13,43 +14,41 @@ logger = logging.getLogger('socketio') class KafkaManager(PubSubManager): # pragma: no cover - """Redis based client manager. + """Kafka based client manager. - This class implements a Redis backend for event sharing across multiple - processes. Only kept here as one more example of how to build a custom - backend, since the kombu backend is perfectly adequate to support a Redis - message queue. + This class implements a Kafka backend for event sharing across multiple + processes. - To use a Redis backend, initialize the :class:`Server` instance as + To use a Kafka backend, initialize the :class:`Server` instance as follows:: - url = 'redis://hostname:port/0' - server = socketio.Server(client_manager=socketio.RedisManager(url)) + url = 'kafka://hostname:port/0' + server = socketio.Server(client_manager=socketio.KafkaManager(url)) - :param url: The connection URL for the Redis server. For a default Redis - store running on the same host, use ``redis://``. - :param channel: The channel name on which the server sends and receives + :param url: The connection URL for the Kafka server. For a default Kafka + store running on the same host, use ``kafka://``. + :param channel: The channel name (topic) on which the server sends and receives notifications. Must be the same in all the servers. :param write_only: If set ot ``True``, only initialize to emit events. The default of ``False`` initializes the class for emitting and receiving. """ - name = 'redis' + name = 'kafka' - def __init__(self, url='redis://localhost:6379/0', channel='socketio', + def __init__(self, url='kafka://localhost:9092/0', channel='socketio', write_only=False): - if redis is None: - raise RuntimeError('Redis package is not installed ' - '(Run "pip install redis" in your ' + if kafka is None: + raise RuntimeError('kafka-python package is not installed ' + '(Run "pip install kafka-python" in your ' 'virtualenv).') - self.redis_url = url - self._redis_connect() - super(RedisManager, self).__init__(channel=channel, + self.kafka_url = url + self.producer = kafka.KafkaProducer(bootstrap_servers='localhost:9092') + + super(KafkaManager, self).__init__(channel=channel, write_only=write_only) def initialize(self): - super(RedisManager, self).initialize() - + super(KafkaManager, self).initialize() monkey_patched = True if self.server.async_mode == 'eventlet': from eventlet.patcher import is_monkey_patched @@ -59,52 +58,22 @@ class KafkaManager(PubSubManager): # pragma: no cover monkey_patched = is_module_patched('socket') if not monkey_patched: raise RuntimeError( - 'Redis requires a monkey patched socket library to work ' + 'Kafka requires a monkey patched socket library to work ' 'with ' + self.server.async_mode) - def _redis_connect(self): - self.redis = redis.Redis.from_url(self.redis_url) - self.pubsub = self.redis.pubsub() def _publish(self, data): - retry = True - while True: - try: - if not retry: - self._redis_connect() - return self.redis.publish(self.channel, pickle.dumps(data)) - except redis.exceptions.ConnectionError: - if retry: - logger.error('Cannot publish to redis... retrying') - retry = False - else: - logger.error('Cannot publish to redis... giving up') - break - - def _redis_listen_with_retries(self): - retry_sleep = 1 - connect = False - while True: - try: - if connect: - self._redis_connect() - self.pubsub.subscribe(self.channel) - for message in self.pubsub.listen(): - yield message - except redis.exceptions.ConnectionError: - logger.error('Cannot receive from redis... ' - 'retrying in {} secs'.format(retry_sleep)) - connect = True - time.sleep(retry_sleep) - retry_sleep *= 2 - if retry_sleep > 60: - retry_sleep = 60 + self.producer.send(self.channel, value=pickle.dumps(data)) + self.producer.flush() + + + def _kafka_listen(self): + self.consumer = kafka.KafkaConsumer(self.channel, bootstrap_servers='localhost:9092') + for message in self.consumer: + yield message + def _listen(self): - channel = self.channel.encode('utf-8') - self.pubsub.subscribe(self.channel) - for message in self._redis_listen_with_retries(): - if message['channel'] == channel and \ - message['type'] == 'message' and 'data' in message: - yield message['data'] - self.pubsub.unsubscribe(self.channel) + for message in self._kafka_listen(): + if message.topic == self.channel: + yield pickle.loads(message.value) From 4f1d8d932907cdd95a402aa3024d52cd5b873c11 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vincent=20M=C3=A9zino?= Date: Sun, 29 Apr 2018 16:13:27 +0200 Subject: [PATCH 4/9] Use the provided url in KafkaManager and remove the method 'initialize' --- socketio/kafka_manager.py | 24 ++++++------------------ 1 file changed, 6 insertions(+), 18 deletions(-) diff --git a/socketio/kafka_manager.py b/socketio/kafka_manager.py index 1ec896e..7bb1bcb 100644 --- a/socketio/kafka_manager.py +++ b/socketio/kafka_manager.py @@ -22,7 +22,7 @@ class KafkaManager(PubSubManager): # pragma: no cover To use a Kafka backend, initialize the :class:`Server` instance as follows:: - url = 'kafka://hostname:port/0' + url = 'kafka://hostname:port' server = socketio.Server(client_manager=socketio.KafkaManager(url)) :param url: The connection URL for the Kafka server. For a default Kafka @@ -35,31 +35,20 @@ class KafkaManager(PubSubManager): # pragma: no cover """ name = 'kafka' - def __init__(self, url='kafka://localhost:9092/0', channel='socketio', + def __init__(self, url='kafka://localhost:9092', channel='socketio', write_only=False): if kafka is None: raise RuntimeError('kafka-python package is not installed ' '(Run "pip install kafka-python" in your ' 'virtualenv).') - self.kafka_url = url - self.producer = kafka.KafkaProducer(bootstrap_servers='localhost:9092') super(KafkaManager, self).__init__(channel=channel, write_only=write_only) - def initialize(self): - super(KafkaManager, self).initialize() - monkey_patched = True - if self.server.async_mode == 'eventlet': - from eventlet.patcher import is_monkey_patched - monkey_patched = is_monkey_patched('socket') - elif 'gevent' in self.server.async_mode: - from gevent.monkey import is_module_patched - monkey_patched = is_module_patched('socket') - if not monkey_patched: - raise RuntimeError( - 'Kafka requires a monkey patched socket library to work ' - 'with ' + self.server.async_mode) + self.kafka_url = url[8:] if url != 'kafka://' else 'localhost:9092' + self.producer = kafka.KafkaProducer(bootstrap_servers=self.kafka_url) + self.consumer = kafka.KafkaConsumer(self.channel, + bootstrap_servers=self.kafka_url) def _publish(self, data): @@ -68,7 +57,6 @@ class KafkaManager(PubSubManager): # pragma: no cover def _kafka_listen(self): - self.consumer = kafka.KafkaConsumer(self.channel, bootstrap_servers='localhost:9092') for message in self.consumer: yield message From 29e9c659d40dd8c405da38ca9295a9ac2f3ead46 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vincent=20M=C3=A9zino?= Date: Mon, 30 Apr 2018 02:27:20 +0200 Subject: [PATCH 5/9] Remove unused imports --- socketio/kafka_manager.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/socketio/kafka_manager.py b/socketio/kafka_manager.py index 7bb1bcb..d32cbda 100644 --- a/socketio/kafka_manager.py +++ b/socketio/kafka_manager.py @@ -1,7 +1,5 @@ import logging import pickle -import time -import json try: import kafka From 839dba341180b3b728b40a562e3c1b0f3d3cf550 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vincent=20M=C3=A9zino?= Date: Mon, 30 Apr 2018 02:31:14 +0200 Subject: [PATCH 6/9] Fix flake8 --- socketio/kafka_manager.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/socketio/kafka_manager.py b/socketio/kafka_manager.py index d32cbda..00a2e7f 100644 --- a/socketio/kafka_manager.py +++ b/socketio/kafka_manager.py @@ -25,8 +25,9 @@ class KafkaManager(PubSubManager): # pragma: no cover :param url: The connection URL for the Kafka server. For a default Kafka store running on the same host, use ``kafka://``. - :param channel: The channel name (topic) on which the server sends and receives - notifications. Must be the same in all the servers. + :param channel: The channel name (topic) on which the server sends and + receives notifications. Must be the same in all the + servers. :param write_only: If set ot ``True``, only initialize to emit events. The default of ``False`` initializes the class for emitting and receiving. @@ -48,17 +49,14 @@ class KafkaManager(PubSubManager): # pragma: no cover self.consumer = kafka.KafkaConsumer(self.channel, bootstrap_servers=self.kafka_url) - def _publish(self, data): self.producer.send(self.channel, value=pickle.dumps(data)) self.producer.flush() - def _kafka_listen(self): for message in self.consumer: yield message - def _listen(self): for message in self._kafka_listen(): if message.topic == self.channel: From dc1e416dabf978343113a919647999bc5cb98928 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vincent=20M=C3=A9zino?= Date: Mon, 30 Apr 2018 02:33:46 +0200 Subject: [PATCH 7/9] Remove trailing whitespaces --- socketio/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/socketio/__init__.py b/socketio/__init__.py index 859b53b..32cd971 100644 --- a/socketio/__init__.py +++ b/socketio/__init__.py @@ -23,7 +23,7 @@ else: # pragma: no cover __version__ = '1.9.0' __all__ = ['__version__', 'Middleware', 'Server', 'BaseManager', - 'PubSubManager', 'KombuManager', 'RedisManager', 'KafkaManager', + 'PubSubManager', 'KombuManager', 'RedisManager', 'KafkaManager', 'ZmqManager', 'Namespace'] if AsyncServer is not None: # pragma: no cover __all__ += ['AsyncServer', 'AsyncNamespace', 'AsyncManager', From 6dc74df2fbb779a832429345a7e54b1af7473425 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vincent=20M=C3=A9zino?= Date: Mon, 30 Apr 2018 02:47:54 +0200 Subject: [PATCH 8/9] Group __all__ element by type in __init__.py --- socketio/__init__.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/socketio/__init__.py b/socketio/__init__.py index 32cd971..11feaa3 100644 --- a/socketio/__init__.py +++ b/socketio/__init__.py @@ -22,9 +22,10 @@ else: # pragma: no cover __version__ = '1.9.0' -__all__ = ['__version__', 'Middleware', 'Server', 'BaseManager', - 'PubSubManager', 'KombuManager', 'RedisManager', 'KafkaManager', - 'ZmqManager', 'Namespace'] +__all__ = ['__version__', 'Middleware', 'Server', + 'BaseManager', 'PubSubManager', + 'KombuManager', 'RedisManager', 'KafkaManager', 'ZmqManager', + 'Namespace'] if AsyncServer is not None: # pragma: no cover __all__ += ['AsyncServer', 'AsyncNamespace', 'AsyncManager', 'AsyncRedisManager'] From a84338fd42c218fe55c547281a5cb24deac549f5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vincent=20M=C3=A9zino?= Date: Mon, 30 Apr 2018 02:49:46 +0200 Subject: [PATCH 9/9] Remove trailing whitespaces... --- socketio/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/socketio/__init__.py b/socketio/__init__.py index 11feaa3..e437754 100644 --- a/socketio/__init__.py +++ b/socketio/__init__.py @@ -24,7 +24,7 @@ __version__ = '1.9.0' __all__ = ['__version__', 'Middleware', 'Server', 'BaseManager', 'PubSubManager', - 'KombuManager', 'RedisManager', 'KafkaManager', 'ZmqManager', + 'KombuManager', 'RedisManager', 'KafkaManager', 'ZmqManager', 'Namespace'] if AsyncServer is not None: # pragma: no cover __all__ += ['AsyncServer', 'AsyncNamespace', 'AsyncManager',