Browse Source

fix: remove listener part, use handlers to emit data

pull/493/head
johaven 5 years ago
parent
commit
6738b7507e
  1. 29
      socketio/uwsgi_manager.py

29
socketio/uwsgi_manager.py

@ -1,6 +1,5 @@
import logging import logging
import pickle import pickle
from queue import Queue
try: try:
import uwsgi import uwsgi
@ -23,12 +22,6 @@ class UWSGIManager(PubSubManager): # pragma: no cover
server = socketio.Server(client_manager=socketio.UWSGIManager()) server = socketio.Server(client_manager=socketio.UWSGIManager())
Notes:
- ``uwsgi.signal_wait()`` not working in async mode but
when fixed it will be a better approach for the listen part.
- uWSGI does not provide a blocking queue, the workaround is to use the
standard Python queue.
:param url: The connection URL (only for compatibility). :param url: The connection URL (only for compatibility).
To use the default signal number, use ``uwsgi:0``. To use the default signal number, use ``uwsgi:0``.
:param channel: The channel name on which the server sends and receives :param channel: The channel name on which the server sends and receives
@ -39,15 +32,13 @@ class UWSGIManager(PubSubManager): # pragma: no cover
""" """
name = 'uwsgi' name = 'uwsgi'
def __init__(self, url='uwsgi:0', channel='socketio', write_only=False, def __init__(self, url='uwsgi:0', channel='socketio', write_only=True,
logger=None): logger=None):
self._check_configuration() self._check_configuration()
self.signum = self._sig_number(url) self.signum = self._sig_number(url)
self.queue = Queue()
self.is_listener = False
uwsgi.register_signal(self.signum, 'workers', self._enqueue) uwsgi.register_signal(self.signum, 'workers', self._enqueue)
super(UWSGIManager, self).__init__(channel=channel, super(UWSGIManager, self).__init__(channel=channel,
write_only=write_only, write_only=True,
logger=logger) logger=logger)
@staticmethod @staticmethod
@ -73,14 +64,13 @@ class UWSGIManager(PubSubManager): # pragma: no cover
return 0 return 0
def _publish(self, data): def _publish(self, data):
logger.warning('pubish from worker %s' % uwsgi.worker_id())
uwsgi.queue_push(pickle.dumps(data)) uwsgi.queue_push(pickle.dumps(data))
uwsgi.signal(self.signum) uwsgi.signal(self.signum)
def _enqueue(self, signum): def _enqueue(self, signum):
if self.is_listener: logger.warning('emit from worker %s' % uwsgi.worker_id())
self.queue.put(uwsgi.queue_last()) self._internal_emit(uwsgi.queue_last())
else:
self._internal_emit(uwsgi.queue_last())
def _internal_emit(self, data): def _internal_emit(self, data):
data = pickle.loads(data) data = pickle.loads(data)
@ -94,12 +84,5 @@ class UWSGIManager(PubSubManager): # pragma: no cover
elif data['method'] == 'close_room': elif data['method'] == 'close_room':
self._handle_close_room(data) self._handle_close_room(data)
def _uwsgi_listen(self):
for message in iter(self.queue.get, None):
if message is not None:
yield message
def _listen(self): def _listen(self):
self.is_listener = True pass
for message in self._uwsgi_listen():
yield pickle.loads(message)

Loading…
Cancel
Save