Browse Source

Merge branch 'master' into middlewares

pull/45/head
Robert Schindler 9 years ago
parent
commit
a7811ebe4a
  1. 12
      docs/index.rst
  2. 2
      socketio/__init__.py
  3. 2
      socketio/base_manager.py
  4. 14
      socketio/kombu_manager.py
  5. 6
      socketio/pubsub_manager.py
  6. 14
      socketio/redis_manager.py
  7. 28
      socketio/server.py
  8. 8
      tests/test_pubsub_manager.py

12
docs/index.rst

@ -252,8 +252,14 @@ instance includes versions of several of the methods in the
:class:`socketio.Server` class that default to the proper namespace when the :class:`socketio.Server` class that default to the proper namespace when the
``namespace`` argument is not given. ``namespace`` argument is not given.
Note: if an event has a handler in a class-based namespace, and also a In the case that an event has a handler in a class-based namespace, and also a
decorator-based function handler, the standalone function handler is invoked. decorator-based function handler, only the standalone function handler is
invoked.
It is important to note that class-based namespaces are singletons. This means
that a single instance of a namespace class is used for all clients, and
consequently, a namespace instance cannot be used to store client specific
information.
Event handler interceptors Event handler interceptors
-------------------------- --------------------------
@ -445,7 +451,7 @@ pub/sub functionality::
mgr = socketio.RedisManager('redis://') mgr = socketio.RedisManager('redis://')
sio = socketio.Server(client_manager=mgr) sio = socketio.Server(client_manager=mgr)
If multiple Sokcet.IO servers are connected to a message queue, they If multiple Socket.IO servers are connected to a message queue, they
automatically communicate with each other and manage a combined client list, automatically communicate with each other and manage a combined client list,
without any need for additional configuration. To have a process other than without any need for additional configuration. To have a process other than
a server connect to the queue to emit a message, the same ``KombuManager`` a server connect to the queue to emit a message, the same ``KombuManager``

2
socketio/__init__.py

@ -8,7 +8,7 @@ from .redis_manager import RedisManager
from .server import Server from .server import Server
from .util import apply_interceptor, ignore_interceptor from .util import apply_interceptor, ignore_interceptor
__version__ = '1.6.0' __version__ = '1.6.1'
__all__ = [__version__, Interceptor, Middleware, Namespace, Server, __all__ = [__version__, Interceptor, Middleware, Namespace, Server,
BaseManager, PubSubManager, KombuManager, RedisManager, BaseManager, PubSubManager, KombuManager, RedisManager,

2
socketio/base_manager.py

@ -113,7 +113,7 @@ class BaseManager(object):
return r return r
def emit(self, event, data, namespace, room=None, skip_sid=None, def emit(self, event, data, namespace, room=None, skip_sid=None,
callback=None): callback=None, **kwargs):
"""Emit a message to a single client, a room, or all the clients """Emit a message to a single client, a room, or all the clients
connected to the namespace.""" connected to the namespace."""
if namespace not in self.rooms or room not in self.rooms[namespace]: if namespace not in self.rooms or room not in self.rooms[namespace]:

14
socketio/kombu_manager.py

@ -47,6 +47,20 @@ class KombuManager(PubSubManager): # pragma: no cover
self.url = url self.url = url
self.producer = self._producer() self.producer = self._producer()
def initialize(self, server):
super(KombuManager, self).initialize(server)
monkey_patched = True
if server.async_mode == 'eventlet':
from eventlet.patcher import is_monkey_patched
monkey_patched = is_monkey_patched('socket')
elif 'gevent' in server.async_mode:
from gevent.monkey import is_module_patched
monkey_patched = is_module_patched('socket')
if not monkey_patched:
raise RuntimeError('Redis requires a monkey patched socket '
'library to work with ' + server.async_mode)
def _connection(self): def _connection(self):
return kombu.Connection(self.url) return kombu.Connection(self.url)

6
socketio/pubsub_manager.py

@ -37,7 +37,7 @@ class PubSubManager(BaseManager):
self.server.logger.info(self.name + ' backend initialized.') self.server.logger.info(self.name + ' backend initialized.')
def emit(self, event, data, namespace=None, room=None, skip_sid=None, def emit(self, event, data, namespace=None, room=None, skip_sid=None,
callback=None): callback=None, **kwargs):
"""Emit a message to a single client, a room, or all the clients """Emit a message to a single client, a room, or all the clients
connected to the namespace. connected to the namespace.
@ -46,6 +46,10 @@ class PubSubManager(BaseManager):
The parameters are the same as in :meth:`.Server.emit`. The parameters are the same as in :meth:`.Server.emit`.
""" """
if kwargs.get('ignore_queue'):
return super(PubSubManager, self).emit(
event, data, namespace=namespace, room=room, skip_sid=skip_sid,
callback=callback)
namespace = namespace or '/' namespace = namespace or '/'
if callback is not None: if callback is not None:
if self.server is None: if self.server is None:

14
socketio/redis_manager.py

@ -43,6 +43,20 @@ class RedisManager(PubSubManager): # pragma: no cover
super(RedisManager, self).__init__(channel=channel, super(RedisManager, self).__init__(channel=channel,
write_only=write_only) write_only=write_only)
def initialize(self, server):
super(RedisManager, self).initialize(server)
monkey_patched = True
if server.async_mode == 'eventlet':
from eventlet.patcher import is_monkey_patched
monkey_patched = is_monkey_patched('socket')
elif 'gevent' in server.async_mode:
from gevent.monkey import is_module_patched
monkey_patched = is_module_patched('socket')
if not monkey_patched:
raise RuntimeError('Redis requires a monkey patched socket '
'library to work with ' + server.async_mode)
def _publish(self, data): def _publish(self, data):
return self.redis.publish(self.channel, pickle.dumps(data)) return self.redis.publish(self.channel, pickle.dumps(data))

28
socketio/server.py

@ -8,6 +8,8 @@ from . import namespace as sio_namespace
from . import packet from . import packet
from . import namespace from . import namespace
default_logger = logging.getLogger('socketio')
class Server(object): class Server(object):
"""A Socket.IO server. """A Socket.IO server.
@ -94,7 +96,7 @@ class Server(object):
if not isinstance(logger, bool): if not isinstance(logger, bool):
self.logger = logger self.logger = logger
else: else:
self.logger = logging.getLogger('socketio') self.logger = default_logger
if not logging.root.handlers and \ if not logging.root.handlers and \
self.logger.level == logging.NOTSET: self.logger.level == logging.NOTSET:
if logger: if logger:
@ -181,7 +183,7 @@ class Server(object):
namespace_handler namespace_handler
def emit(self, event, data=None, room=None, skip_sid=None, namespace=None, def emit(self, event, data=None, room=None, skip_sid=None, namespace=None,
callback=None): callback=None, **kwargs):
"""Emit a custom event to one or more connected clients. """Emit a custom event to one or more connected clients.
:param event: The event name. It can be any string. The event names :param event: The event name. It can be any string. The event names
@ -206,14 +208,22 @@ class Server(object):
that will be passed to the function are those provided that will be passed to the function are those provided
by the client. Callback functions can only be used by the client. Callback functions can only be used
when addressing an individual client. when addressing an individual client.
:param ignore_queue: Only used when a message queue is configured. If
set to ``True``, the event is emitted to the
clients directly, without going through the queue.
This is more efficient, but only works when a
single server process is used. It is recommended
to always leave this parameter with its default
value of ``False``.
""" """
namespace = namespace or '/' namespace = namespace or '/'
self.logger.info('emitting event "%s" to %s [%s]', event, self.logger.info('emitting event "%s" to %s [%s]', event,
room or 'all', namespace) room or 'all', namespace)
self.manager.emit(event, data, namespace, room, skip_sid, callback) self.manager.emit(event, data, namespace, room, skip_sid, callback,
**kwargs)
def send(self, data, room=None, skip_sid=None, namespace=None, def send(self, data, room=None, skip_sid=None, namespace=None,
callback=None): callback=None, **kwargs):
"""Send a message to one or more connected clients. """Send a message to one or more connected clients.
This function emits an event with the name ``'message'``. Use This function emits an event with the name ``'message'``. Use
@ -238,8 +248,16 @@ class Server(object):
that will be passed to the function are those provided that will be passed to the function are those provided
by the client. Callback functions can only be used by the client. Callback functions can only be used
when addressing an individual client. when addressing an individual client.
:param ignore_queue: Only used when a message queue is configured. If
set to ``True``, the event is emitted to the
clients directly, without going through the queue.
This is more efficient, but only works when a
single server process is used. It is recommended
to always leave this parameter with its default
value of ``False``.
""" """
self.emit('message', data, room, skip_sid, namespace, callback) self.emit('message', data, room, skip_sid, namespace, callback,
**kwargs)
def enter_room(self, sid, room, namespace=None): def enter_room(self, sid, room, namespace=None):
"""Enter a room. """Enter a room.

8
tests/test_pubsub_manager.py

@ -85,6 +85,14 @@ class TestBaseManager(unittest.TestCase):
self.assertRaises(ValueError, self.pm.emit, 'foo', 'bar', self.assertRaises(ValueError, self.pm.emit, 'foo', 'bar',
callback='cb') callback='cb')
def test_emit_with_ignore_queue(self):
self.pm.connect('123', '/')
self.pm.emit('foo', 'bar', room='123', namespace='/',
ignore_queue=True)
self.pm._publish.assert_not_called()
self.pm.server._emit_internal.assert_called_once_with('123', 'foo',
'bar', '/', None)
def test_close_room(self): def test_close_room(self):
self.pm.close_room('foo') self.pm.close_room('foo')
self.pm._publish.assert_called_once_with( self.pm._publish.assert_called_once_with(

Loading…
Cancel
Save