From d6f703f1bd36b1a68e0e555212ef128fef3aacad Mon Sep 17 00:00:00 2001 From: Eric Seidler Date: Tue, 3 Jan 2017 08:26:28 -0600 Subject: [PATCH 1/6] add ZmqManager to dunder init --- socketio/__init__.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/socketio/__init__.py b/socketio/__init__.py index 5e38128..c4e088e 100644 --- a/socketio/__init__.py +++ b/socketio/__init__.py @@ -3,10 +3,11 @@ from .base_manager import BaseManager from .pubsub_manager import PubSubManager from .kombu_manager import KombuManager from .redis_manager import RedisManager +from .zmq_manager import ZmqManager from .server import Server from .namespace import Namespace __version__ = '1.6.2' __all__ = [__version__, Middleware, Server, BaseManager, PubSubManager, - KombuManager, RedisManager, Namespace] + KombuManager, RedisManager, ZmqManager, Namespace] From 88f3b87efa9f91d1e6eb23a110962d8d664ff1c3 Mon Sep 17 00:00:00 2001 From: Eric Seidler Date: Tue, 3 Jan 2017 08:31:00 -0600 Subject: [PATCH 2/6] add zmq manager --- socketio/zmq_manager.py | 110 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 110 insertions(+) create mode 100644 socketio/zmq_manager.py diff --git a/socketio/zmq_manager.py b/socketio/zmq_manager.py new file mode 100644 index 0000000..6572e33 --- /dev/null +++ b/socketio/zmq_manager.py @@ -0,0 +1,110 @@ +import time +import pickle +import re + +try: + import zmq +except ImportError: + zmq = None + +from .pubsub_manager import PubSubManager + + +class ZmqManager(PubSubManager): # pragma: no cover + """zmq based client manager. + + This class implements a zmq backend for event sharing across multiple + processes. To use a zmq backend, initialize the :class:`Server` instance as + follows:: + + url = 'zmq+tcp://hostname:port1+port2' + server = socketio.Server(client_manager=socketio.ZmqManager(url)) + + :param url: The connection URL for the zmq message broker, + which will need to be provided and running. + :param channel: The channel name on which the server sends and receives + notifications. Must be the same in all the servers. + :param write_only: If set to ``True``, only initialize to emit events. The + default of ``False`` initializes the class for emitting + and receiving. + + + N.B. + a zmq message broker must be running for the zmq_manager to work. + you can write your own or adapt one from the following simple broker below. + port numbers in the broker must match port numbers in connection string. + `` + import zmq + + receiver = zmq.Context().socket(zmq.PULL) + receiver.bind("tcp://*:5555") + + publisher = zmq.Context().socket(zmq.PUB) + publisher.bind("tcp://*:5556") + + + while True: + publisher.send(receiver.recv()) + `` + + """ + name = 'zmq' + + def __init__(self, url='tcp://localhost:5555+5556', + channel='socketio', + write_only=False): + if zmq is None: + raise RuntimeError('zmq package is not installed ' + '(Run "pip install pyzmq" in your ' + 'virtualenv).') + + r = re.compile(':\d+\+\d+$') + if not (url.startswith('zmq+tcp://') and r.search(url)): + raise RuntimeError('unexpected connection string: ' + url) + + url = url.replace('zmq+', '') + (sink_url, sub_port) = url.split('+') + sink_port = sink_url.split(':')[-1] + sub_url = sink_url.replace(sink_port, sub_port) + + sink = zmq.Context().socket(zmq.PUSH) + sink.connect(sink_url) + + sub = zmq.Context().socket(zmq.SUB) + sub.setsockopt_string(zmq.SUBSCRIBE, u'') + sub.connect(sub_url) + + self.sink = sink + self.sub = sub + self.channel = channel + super(ZmqManager, self).__init__(channel=channel, + write_only=write_only) + + def _publish(self, data): + pickled_data = pickle.dumps( + { + 'type': 'message', + 'channel': self.channel, + 'data': data + } + ) + return self.sink.send(pickled_data) + + def zmq_listen(self): + while True: + try: + response = self.sub.recv(flags=zmq.NOBLOCK) + if response is not None: + yield response + except zmq.Again: + time.sleep(0.5) + + def _listen(self): + for message in self.zmq_listen(): + if isinstance(message, str): + message = pickle.loads(message) + if isinstance(message, dict) and \ + message['channel'] == self.channel and \ + 'data' in message: + yield message['data'] + return From fab0683bebbc550d9e9fef418baa07fc510f71d0 Mon Sep 17 00:00:00 2001 From: Eric Seidler Date: Thu, 5 Jan 2017 21:05:00 -0600 Subject: [PATCH 3/6] add zmq prefix to default value for url --- socketio/zmq_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/socketio/zmq_manager.py b/socketio/zmq_manager.py index 6572e33..f5e6361 100644 --- a/socketio/zmq_manager.py +++ b/socketio/zmq_manager.py @@ -50,7 +50,7 @@ class ZmqManager(PubSubManager): # pragma: no cover """ name = 'zmq' - def __init__(self, url='tcp://localhost:5555+5556', + def __init__(self, url='zmq+tcp://localhost:5555+5556', channel='socketio', write_only=False): if zmq is None: From 09d8d5d0d40d71e246d5c1a1ba8d2375b591f833 Mon Sep 17 00:00:00 2001 From: Eric Seidler Date: Thu, 5 Jan 2017 21:12:07 -0600 Subject: [PATCH 4/6] check message type before yielding message data --- socketio/zmq_manager.py | 1 + 1 file changed, 1 insertion(+) diff --git a/socketio/zmq_manager.py b/socketio/zmq_manager.py index f5e6361..12dfe89 100644 --- a/socketio/zmq_manager.py +++ b/socketio/zmq_manager.py @@ -104,6 +104,7 @@ class ZmqManager(PubSubManager): # pragma: no cover if isinstance(message, str): message = pickle.loads(message) if isinstance(message, dict) and \ + message['type'] == 'message' and \ message['channel'] == self.channel and \ 'data' in message: yield message['data'] From 42ad98e750afd91847f65a7221de665da6585495 Mon Sep 17 00:00:00 2001 From: Eric Seidler Date: Thu, 5 Jan 2017 21:44:07 -0600 Subject: [PATCH 5/6] handle failed pickled.loads --- socketio/zmq_manager.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/socketio/zmq_manager.py b/socketio/zmq_manager.py index 12dfe89..2ad79ce 100644 --- a/socketio/zmq_manager.py +++ b/socketio/zmq_manager.py @@ -102,7 +102,10 @@ class ZmqManager(PubSubManager): # pragma: no cover def _listen(self): for message in self.zmq_listen(): if isinstance(message, str): - message = pickle.loads(message) + try: + message = pickle.loads(message) + except Exception: + pass if isinstance(message, dict) and \ message['type'] == 'message' and \ message['channel'] == self.channel and \ From 10d273b3feee216ea711689004454e87c3b237bc Mon Sep 17 00:00:00 2001 From: Eric Seidler Date: Mon, 30 Jan 2017 21:28:48 -0600 Subject: [PATCH 6/6] use non-blocking eventlet zmq wrapper in listen method --- socketio/zmq_manager.py | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/socketio/zmq_manager.py b/socketio/zmq_manager.py index 2ad79ce..3950655 100644 --- a/socketio/zmq_manager.py +++ b/socketio/zmq_manager.py @@ -1,9 +1,8 @@ -import time import pickle import re try: - import zmq + import eventlet.green.zmq as zmq except ImportError: zmq = None @@ -92,12 +91,9 @@ class ZmqManager(PubSubManager): # pragma: no cover def zmq_listen(self): while True: - try: - response = self.sub.recv(flags=zmq.NOBLOCK) - if response is not None: - yield response - except zmq.Again: - time.sleep(0.5) + response = self.sub.recv() + if response is not None: + yield response def _listen(self): for message in self.zmq_listen():