From b5e40ce64482eba10334a0ca0c09e991328b5a22 Mon Sep 17 00:00:00 2001 From: johaven Date: Tue, 26 May 2020 18:45:05 +0200 Subject: [PATCH] implement uWSGI pubsub manager --- docs/api.rst | 6 +++ docs/server.rst | 25 +++++++++++++ socketio/__init__.py | 4 +- socketio/uwsgi_manager.py | 77 +++++++++++++++++++++++++++++++++++++++ 4 files changed, 111 insertions(+), 1 deletion(-) create mode 100644 socketio/uwsgi_manager.py diff --git a/docs/api.rst b/docs/api.rst index b23de1a..fa3293b 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -108,6 +108,12 @@ API Reference .. autoclass:: RedisManager :members: +``UWSGIManager`` class +---------------------- + +.. autoclass:: UWSGIManager + :members: + ``KafkaManager`` class ---------------------- diff --git a/docs/server.rst b/docs/server.rst index faf77e8..56f4800 100644 --- a/docs/server.rst +++ b/docs/server.rst @@ -461,6 +461,31 @@ to connect to a message queue such as `Redis `_ or `RabbitMQ `_, to communicate with other related Socket.IO servers or auxiliary workers. +uWSGI +~~~~~ + +To use a uWSGI message queue, you need to run uWSGI server with option --queue:: + + # One queue with 1 slot + uwsgi --http :5000 --gevent 1000 --queue 1 --http-websockets --master --wsgi-file app.py --callable app + +Only one slot is required because only the last message is dequeued. +The uWSGI queue is processed when a uWSGI signal is triggered on a specific channel (default 0). + +The uWSGI queue is configured through the :class:`socketio.UWSGIManager`:: + + # socketio.Server class + mgr = socketio.UWSGIManager() + sio = socketio.Server(client_manager=mgr) + +If you already use uWSGI signals you can specify on which channel the events will be triggered:: + + # channel must be a non-negative integer + socketio.UWSGIManager('uwsgi:8') + +Note that uWSGI currently was not tested with asyncio, so it cannot be used with +the :class:`socketio.AsyncServer` class. + Redis ~~~~~ diff --git a/socketio/__init__.py b/socketio/__init__.py index 5841b3d..e4b4f53 100644 --- a/socketio/__init__.py +++ b/socketio/__init__.py @@ -7,6 +7,7 @@ from .kombu_manager import KombuManager from .redis_manager import RedisManager from .kafka_manager import KafkaManager from .zmq_manager import ZmqManager +from .uwsgi_manager import UWSGIManager from .server import Server from .namespace import Namespace, ClientNamespace from .middleware import WSGIApp, Middleware @@ -31,7 +32,8 @@ __version__ = '4.6.1dev' __all__ = ['__version__', 'Client', 'Server', 'BaseManager', 'PubSubManager', 'KombuManager', 'RedisManager', 'ZmqManager', 'KafkaManager', - 'Namespace', 'ClientNamespace', 'WSGIApp', 'Middleware'] + 'UWSGIManager', 'Namespace', 'ClientNamespace', 'WSGIApp', + 'Middleware'] if AsyncServer is not None: # pragma: no cover __all__ += ['AsyncClient', 'AsyncServer', 'AsyncNamespace', 'AsyncClientNamespace', 'AsyncManager', 'AsyncRedisManager', diff --git a/socketio/uwsgi_manager.py b/socketio/uwsgi_manager.py new file mode 100644 index 0000000..3245cfe --- /dev/null +++ b/socketio/uwsgi_manager.py @@ -0,0 +1,77 @@ +import logging +import pickle +from queue import SimpleQueue + +try: + import uwsgi +except ImportError: + uwsgi = None + +from .pubsub_manager import PubSubManager + +logger = logging.getLogger('socketio') + + +class UWSGIManager(PubSubManager): # pragma: no cover + """Uwsgi based client manager. + + This class implements a UWSGI backend for event sharing across multiple + processes. + + To use a uWSGI backend, initialize the :class:`Server` instance as + follows:: + + server = socketio.Server(client_manager=socketio.UWSGIManager()) + + :param channel: The channel number on which the uWSGI Signal is propagated + accross processes. + :param write_only: If set ot ``True``, only initialize to emit events. The + default of ``False`` initializes the class for emitting + and receiving. + """ + name = 'uwsgi' + + def __init__(self, url='uwsgi:0', channel='socketio', write_only=False, logger=None): + self._check_configuration() + self.signum = self._sig_number(url) + self.queue = SimpleQueue() # uWSGI does not provide a a blocking queue + super(UWSGIManager, self).__init__(channel=channel, + write_only=write_only, + logger=logger) + + @staticmethod + def _check_configuration(): + if uwsgi is None: + raise RuntimeError('You are not running under uWSGI') + try: + uwsgi.queue_last() + except AttributeError: + raise RuntimeError('uWSGI queue must be enabled with option --queue 1') + + @staticmethod + def _sig_number(url): + if ':' in url: + try: + sig = int(url.split(':')[1]) + except ValueError: + logger.warning('Bad URL format %s, uWSGI signal is listening on default (1)' % url) + else: + return sig + return 0 + + def _publish(self, data): + uwsgi.queue_push(pickle.dumps(data)) + uwsgi.signal(self.signum) + + def _enqueue(self, signum): + self.queue.put(uwsgi.queue_last()) + + def _uwsgi_listen(self): + uwsgi.register_signal(self.signum, 'workers', self._enqueue) + for message in iter(self.queue.get, None): + if message is not None: + yield message + + def _listen(self): + for message in self._uwsgi_listen(): + yield pickle.loads(message)