import time import pickle import re try: import zmq except ImportError: zmq = None from .pubsub_manager import PubSubManager class ZmqManager(PubSubManager): # pragma: no cover """zmq based client manager. This class implements a zmq backend for event sharing across multiple processes. To use a zmq backend, initialize the :class:`Server` instance as follows:: url = 'zmq+tcp://hostname:port1+port2' server = socketio.Server(client_manager=socketio.ZmqManager(url)) :param url: The connection URL for the zmq message broker, which will need to be provided and running. :param channel: The channel name on which the server sends and receives notifications. Must be the same in all the servers. :param write_only: If set to ``True``, only initialize to emit events. The default of ``False`` initializes the class for emitting and receiving. N.B. a zmq message broker must be running for the zmq_manager to work. you can write your own or adapt one from the following simple broker below. port numbers in the broker must match port numbers in connection string. `` import zmq receiver = zmq.Context().socket(zmq.PULL) receiver.bind("tcp://*:5555") publisher = zmq.Context().socket(zmq.PUB) publisher.bind("tcp://*:5556") while True: publisher.send(receiver.recv()) `` """ name = 'zmq' def __init__(self, url='zmq+tcp://localhost:5555+5556', channel='socketio', write_only=False): if zmq is None: raise RuntimeError('zmq package is not installed ' '(Run "pip install pyzmq" in your ' 'virtualenv).') r = re.compile(':\d+\+\d+$') if not (url.startswith('zmq+tcp://') and r.search(url)): raise RuntimeError('unexpected connection string: ' + url) url = url.replace('zmq+', '') (sink_url, sub_port) = url.split('+') sink_port = sink_url.split(':')[-1] sub_url = sink_url.replace(sink_port, sub_port) sink = zmq.Context().socket(zmq.PUSH) sink.connect(sink_url) sub = zmq.Context().socket(zmq.SUB) sub.setsockopt_string(zmq.SUBSCRIBE, u'') sub.connect(sub_url) self.sink = sink self.sub = sub self.channel = channel super(ZmqManager, self).__init__(channel=channel, write_only=write_only) def _publish(self, data): pickled_data = pickle.dumps( { 'type': 'message', 'channel': self.channel, 'data': data } ) return self.sink.send(pickled_data) def zmq_listen(self): while True: try: response = self.sub.recv(flags=zmq.NOBLOCK) if response is not None: yield response except zmq.Again: time.sleep(0.5) def _listen(self): for message in self.zmq_listen(): if isinstance(message, str): try: message = pickle.loads(message) except Exception: pass if isinstance(message, dict) and \ message['type'] == 'message' and \ message['channel'] == self.channel and \ 'data' in message: yield message['data'] return