1 changed files with 110 additions and 0 deletions
@ -0,0 +1,110 @@ |
|||
import time |
|||
import pickle |
|||
import re |
|||
|
|||
try: |
|||
import zmq |
|||
except ImportError: |
|||
zmq = None |
|||
|
|||
from .pubsub_manager import PubSubManager |
|||
|
|||
|
|||
class ZmqManager(PubSubManager): # pragma: no cover |
|||
"""zmq based client manager. |
|||
|
|||
This class implements a zmq backend for event sharing across multiple |
|||
processes. To use a zmq backend, initialize the :class:`Server` instance as |
|||
follows:: |
|||
|
|||
url = 'zmq+tcp://hostname:port1+port2' |
|||
server = socketio.Server(client_manager=socketio.ZmqManager(url)) |
|||
|
|||
:param url: The connection URL for the zmq message broker, |
|||
which will need to be provided and running. |
|||
:param channel: The channel name on which the server sends and receives |
|||
notifications. Must be the same in all the servers. |
|||
:param write_only: If set to ``True``, only initialize to emit events. The |
|||
default of ``False`` initializes the class for emitting |
|||
and receiving. |
|||
|
|||
|
|||
N.B. |
|||
a zmq message broker must be running for the zmq_manager to work. |
|||
you can write your own or adapt one from the following simple broker below. |
|||
port numbers in the broker must match port numbers in connection string. |
|||
`` |
|||
import zmq |
|||
|
|||
receiver = zmq.Context().socket(zmq.PULL) |
|||
receiver.bind("tcp://*:5555") |
|||
|
|||
publisher = zmq.Context().socket(zmq.PUB) |
|||
publisher.bind("tcp://*:5556") |
|||
|
|||
|
|||
while True: |
|||
publisher.send(receiver.recv()) |
|||
`` |
|||
|
|||
""" |
|||
name = 'zmq' |
|||
|
|||
def __init__(self, url='tcp://localhost:5555+5556', |
|||
channel='socketio', |
|||
write_only=False): |
|||
if zmq is None: |
|||
raise RuntimeError('zmq package is not installed ' |
|||
'(Run "pip install pyzmq" in your ' |
|||
'virtualenv).') |
|||
|
|||
r = re.compile(':\d+\+\d+$') |
|||
if not (url.startswith('zmq+tcp://') and r.search(url)): |
|||
raise RuntimeError('unexpected connection string: ' + url) |
|||
|
|||
url = url.replace('zmq+', '') |
|||
(sink_url, sub_port) = url.split('+') |
|||
sink_port = sink_url.split(':')[-1] |
|||
sub_url = sink_url.replace(sink_port, sub_port) |
|||
|
|||
sink = zmq.Context().socket(zmq.PUSH) |
|||
sink.connect(sink_url) |
|||
|
|||
sub = zmq.Context().socket(zmq.SUB) |
|||
sub.setsockopt_string(zmq.SUBSCRIBE, u'') |
|||
sub.connect(sub_url) |
|||
|
|||
self.sink = sink |
|||
self.sub = sub |
|||
self.channel = channel |
|||
super(ZmqManager, self).__init__(channel=channel, |
|||
write_only=write_only) |
|||
|
|||
def _publish(self, data): |
|||
pickled_data = pickle.dumps( |
|||
{ |
|||
'type': 'message', |
|||
'channel': self.channel, |
|||
'data': data |
|||
} |
|||
) |
|||
return self.sink.send(pickled_data) |
|||
|
|||
def zmq_listen(self): |
|||
while True: |
|||
try: |
|||
response = self.sub.recv(flags=zmq.NOBLOCK) |
|||
if response is not None: |
|||
yield response |
|||
except zmq.Again: |
|||
time.sleep(0.5) |
|||
|
|||
def _listen(self): |
|||
for message in self.zmq_listen(): |
|||
if isinstance(message, str): |
|||
message = pickle.loads(message) |
|||
if isinstance(message, dict) and \ |
|||
message['channel'] == self.channel and \ |
|||
'data' in message: |
|||
yield message['data'] |
|||
return |
Loading…
Reference in new issue