Browse Source

implement uWSGI pubsub manager

pull/493/head
johaven 5 years ago
parent
commit
b5e40ce644
  1. 6
      docs/api.rst
  2. 25
      docs/server.rst
  3. 4
      socketio/__init__.py
  4. 77
      socketio/uwsgi_manager.py

6
docs/api.rst

@ -108,6 +108,12 @@ API Reference
.. autoclass:: RedisManager
:members:
``UWSGIManager`` class
----------------------
.. autoclass:: UWSGIManager
:members:
``KafkaManager`` class
----------------------

25
docs/server.rst

@ -461,6 +461,31 @@ to connect to a message queue such as `Redis <http://redis.io/>`_ or
`RabbitMQ <https://www.rabbitmq.com/>`_, to communicate with other related
Socket.IO servers or auxiliary workers.
uWSGI
~~~~~
To use a uWSGI message queue, you need to run uWSGI server with option --queue::
# One queue with 1 slot
uwsgi --http :5000 --gevent 1000 --queue 1 --http-websockets --master --wsgi-file app.py --callable app
Only one slot is required because only the last message is dequeued.
The uWSGI queue is processed when a uWSGI signal is triggered on a specific channel (default 0).
The uWSGI queue is configured through the :class:`socketio.UWSGIManager`::
# socketio.Server class
mgr = socketio.UWSGIManager()
sio = socketio.Server(client_manager=mgr)
If you already use uWSGI signals you can specify on which channel the events will be triggered::
# channel must be a non-negative integer
socketio.UWSGIManager('uwsgi:8')
Note that uWSGI currently was not tested with asyncio, so it cannot be used with
the :class:`socketio.AsyncServer` class.
Redis
~~~~~

4
socketio/__init__.py

@ -7,6 +7,7 @@ from .kombu_manager import KombuManager
from .redis_manager import RedisManager
from .kafka_manager import KafkaManager
from .zmq_manager import ZmqManager
from .uwsgi_manager import UWSGIManager
from .server import Server
from .namespace import Namespace, ClientNamespace
from .middleware import WSGIApp, Middleware
@ -31,7 +32,8 @@ __version__ = '4.6.1dev'
__all__ = ['__version__', 'Client', 'Server', 'BaseManager', 'PubSubManager',
'KombuManager', 'RedisManager', 'ZmqManager', 'KafkaManager',
'Namespace', 'ClientNamespace', 'WSGIApp', 'Middleware']
'UWSGIManager', 'Namespace', 'ClientNamespace', 'WSGIApp',
'Middleware']
if AsyncServer is not None: # pragma: no cover
__all__ += ['AsyncClient', 'AsyncServer', 'AsyncNamespace',
'AsyncClientNamespace', 'AsyncManager', 'AsyncRedisManager',

77
socketio/uwsgi_manager.py

@ -0,0 +1,77 @@
import logging
import pickle
from queue import SimpleQueue
try:
import uwsgi
except ImportError:
uwsgi = None
from .pubsub_manager import PubSubManager
logger = logging.getLogger('socketio')
class UWSGIManager(PubSubManager): # pragma: no cover
"""Uwsgi based client manager.
This class implements a UWSGI backend for event sharing across multiple
processes.
To use a uWSGI backend, initialize the :class:`Server` instance as
follows::
server = socketio.Server(client_manager=socketio.UWSGIManager())
:param channel: The channel number on which the uWSGI Signal is propagated
accross processes.
:param write_only: If set ot ``True``, only initialize to emit events. The
default of ``False`` initializes the class for emitting
and receiving.
"""
name = 'uwsgi'
def __init__(self, url='uwsgi:0', channel='socketio', write_only=False, logger=None):
self._check_configuration()
self.signum = self._sig_number(url)
self.queue = SimpleQueue() # uWSGI does not provide a a blocking queue
super(UWSGIManager, self).__init__(channel=channel,
write_only=write_only,
logger=logger)
@staticmethod
def _check_configuration():
if uwsgi is None:
raise RuntimeError('You are not running under uWSGI')
try:
uwsgi.queue_last()
except AttributeError:
raise RuntimeError('uWSGI queue must be enabled with option --queue 1')
@staticmethod
def _sig_number(url):
if ':' in url:
try:
sig = int(url.split(':')[1])
except ValueError:
logger.warning('Bad URL format %s, uWSGI signal is listening on default (1)' % url)
else:
return sig
return 0
def _publish(self, data):
uwsgi.queue_push(pickle.dumps(data))
uwsgi.signal(self.signum)
def _enqueue(self, signum):
self.queue.put(uwsgi.queue_last())
def _uwsgi_listen(self):
uwsgi.register_signal(self.signum, 'workers', self._enqueue)
for message in iter(self.queue.get, None):
if message is not None:
yield message
def _listen(self):
for message in self._uwsgi_listen():
yield pickle.loads(message)
Loading…
Cancel
Save