From 88f3b87efa9f91d1e6eb23a110962d8d664ff1c3 Mon Sep 17 00:00:00 2001 From: Eric Seidler Date: Tue, 3 Jan 2017 08:31:00 -0600 Subject: [PATCH] add zmq manager --- socketio/zmq_manager.py | 110 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 110 insertions(+) create mode 100644 socketio/zmq_manager.py diff --git a/socketio/zmq_manager.py b/socketio/zmq_manager.py new file mode 100644 index 0000000..6572e33 --- /dev/null +++ b/socketio/zmq_manager.py @@ -0,0 +1,110 @@ +import time +import pickle +import re + +try: + import zmq +except ImportError: + zmq = None + +from .pubsub_manager import PubSubManager + + +class ZmqManager(PubSubManager): # pragma: no cover + """zmq based client manager. + + This class implements a zmq backend for event sharing across multiple + processes. To use a zmq backend, initialize the :class:`Server` instance as + follows:: + + url = 'zmq+tcp://hostname:port1+port2' + server = socketio.Server(client_manager=socketio.ZmqManager(url)) + + :param url: The connection URL for the zmq message broker, + which will need to be provided and running. + :param channel: The channel name on which the server sends and receives + notifications. Must be the same in all the servers. + :param write_only: If set to ``True``, only initialize to emit events. The + default of ``False`` initializes the class for emitting + and receiving. + + + N.B. + a zmq message broker must be running for the zmq_manager to work. + you can write your own or adapt one from the following simple broker below. + port numbers in the broker must match port numbers in connection string. + `` + import zmq + + receiver = zmq.Context().socket(zmq.PULL) + receiver.bind("tcp://*:5555") + + publisher = zmq.Context().socket(zmq.PUB) + publisher.bind("tcp://*:5556") + + + while True: + publisher.send(receiver.recv()) + `` + + """ + name = 'zmq' + + def __init__(self, url='tcp://localhost:5555+5556', + channel='socketio', + write_only=False): + if zmq is None: + raise RuntimeError('zmq package is not installed ' + '(Run "pip install pyzmq" in your ' + 'virtualenv).') + + r = re.compile(':\d+\+\d+$') + if not (url.startswith('zmq+tcp://') and r.search(url)): + raise RuntimeError('unexpected connection string: ' + url) + + url = url.replace('zmq+', '') + (sink_url, sub_port) = url.split('+') + sink_port = sink_url.split(':')[-1] + sub_url = sink_url.replace(sink_port, sub_port) + + sink = zmq.Context().socket(zmq.PUSH) + sink.connect(sink_url) + + sub = zmq.Context().socket(zmq.SUB) + sub.setsockopt_string(zmq.SUBSCRIBE, u'') + sub.connect(sub_url) + + self.sink = sink + self.sub = sub + self.channel = channel + super(ZmqManager, self).__init__(channel=channel, + write_only=write_only) + + def _publish(self, data): + pickled_data = pickle.dumps( + { + 'type': 'message', + 'channel': self.channel, + 'data': data + } + ) + return self.sink.send(pickled_data) + + def zmq_listen(self): + while True: + try: + response = self.sub.recv(flags=zmq.NOBLOCK) + if response is not None: + yield response + except zmq.Again: + time.sleep(0.5) + + def _listen(self): + for message in self.zmq_listen(): + if isinstance(message, str): + message = pickle.loads(message) + if isinstance(message, dict) and \ + message['channel'] == self.channel and \ + 'data' in message: + yield message['data'] + return