diff --git a/src/socketio/asyncio_pubsub_manager.py b/src/socketio/asyncio_pubsub_manager.py index 1a06889..b6c1a15 100644 --- a/src/socketio/asyncio_pubsub_manager.py +++ b/src/socketio/asyncio_pubsub_manager.py @@ -64,10 +64,12 @@ class AsyncPubSubManager(AsyncManager): callback = (room, namespace, id) else: callback = None - await self._publish({'method': 'emit', 'event': event, 'data': data, - 'namespace': namespace, 'room': room, - 'skip_sid': skip_sid, 'callback': callback, - 'host_id': self.host_id}) + message = {'method': 'emit', 'event': event, 'data': data, + 'namespace': namespace, 'room': room, + 'skip_sid': skip_sid, 'callback': callback, + 'host_id': self.host_id} + await self._handle_emit(message) # handle in this host + await self._publish(message) # notify other hosts async def can_disconnect(self, sid, namespace): if self.is_connected(sid, namespace): @@ -76,18 +78,23 @@ class AsyncPubSubManager(AsyncManager): else: # client is in another server, so we post request to the queue await self._publish({'method': 'disconnect', 'sid': sid, - 'namespace': namespace or '/'}) + 'namespace': namespace or '/', + 'host_id': self.host_id}) async def disconnect(self, sid, namespace, **kwargs): if kwargs.get('ignore_queue'): return await super(AsyncPubSubManager, self).disconnect( sid, namespace=namespace) - await self._publish({'method': 'disconnect', 'sid': sid, - 'namespace': namespace or '/'}) + message = {'method': 'disconnect', 'sid': sid, + 'namespace': namespace or '/', 'host_id': self.host_id} + await self._handle_disconnect(message) # handle in this host + await self._publish(message) # notify other hosts async def close_room(self, room, namespace=None): - await self._publish({'method': 'close_room', 'room': room, - 'namespace': namespace or '/'}) + message = {'method': 'close_room', 'room': room, + 'namespace': namespace or '/', 'host_id': self.host_id} + await self._handle_close_room(message) # handle in this host + await self._publish(message) # notify other hosts async def _publish(self, data): """Publish a message on the Socket.IO channel. @@ -139,9 +146,12 @@ class AsyncPubSubManager(AsyncManager): *args): # When an event callback is received, the callback is returned back # the sender, which is identified by the host_id - await self._publish({'method': 'callback', 'host_id': host_id, - 'sid': sid, 'namespace': namespace, - 'id': callback_id, 'args': args}) + if host_id == self.host_id: + await self.trigger_callback(sid, callback_id, args) + else: + await self._publish({'method': 'callback', 'host_id': host_id, + 'sid': sid, 'namespace': namespace, + 'id': callback_id, 'args': args}) async def _handle_disconnect(self, message): await self.server.disconnect(sid=message.get('sid'), @@ -149,8 +159,8 @@ class AsyncPubSubManager(AsyncManager): ignore_queue=True) async def _handle_close_room(self, message): - await super().close_room( - room=message.get('room'), namespace=message.get('namespace')) + await super().close_room(room=message.get('room'), + namespace=message.get('namespace')) async def _thread(self): while True: @@ -171,17 +181,18 @@ class AsyncPubSubManager(AsyncManager): except: pass if data and 'method' in data: - self._get_logger().info('pubsub message: {}'.format( + self._get_logger().debug('pubsub message: {}'.format( data['method'])) try: - if data['method'] == 'emit': - await self._handle_emit(data) - elif data['method'] == 'callback': + if data['method'] == 'callback': await self._handle_callback(data) - elif data['method'] == 'disconnect': - await self._handle_disconnect(data) - elif data['method'] == 'close_room': - await self._handle_close_room(data) + elif data.get('host_id') != self.host_id: + if data['method'] == 'emit': + await self._handle_emit(data) + elif data['method'] == 'disconnect': + await self._handle_disconnect(data) + elif data['method'] == 'close_room': + await self._handle_close_room(data) except asyncio.CancelledError: raise # let the outer try/except handle it except: diff --git a/src/socketio/pubsub_manager.py b/src/socketio/pubsub_manager.py index 51079bf..788f0a4 100644 --- a/src/socketio/pubsub_manager.py +++ b/src/socketio/pubsub_manager.py @@ -61,10 +61,12 @@ class PubSubManager(BaseManager): callback = (room, namespace, id) else: callback = None - self._publish({'method': 'emit', 'event': event, 'data': data, - 'namespace': namespace, 'room': room, - 'skip_sid': skip_sid, 'callback': callback, - 'host_id': self.host_id}) + message = {'method': 'emit', 'event': event, 'data': data, + 'namespace': namespace, 'room': room, + 'skip_sid': skip_sid, 'callback': callback, + 'host_id': self.host_id} + self._handle_emit(message) # handle in this host + self._publish(message) # notify other hosts def can_disconnect(self, sid, namespace): if self.is_connected(sid, namespace): @@ -72,19 +74,25 @@ class PubSubManager(BaseManager): return super().can_disconnect(sid, namespace) else: # client is in another server, so we post request to the queue - self._publish({'method': 'disconnect', 'sid': sid, - 'namespace': namespace or '/'}) + message = {'method': 'disconnect', 'sid': sid, + 'namespace': namespace or '/', 'host_id': self.host_id} + self._handle_disconnect(message) # handle in this host + self._publish(message) # notify other hosts def disconnect(self, sid, namespace=None, **kwargs): if kwargs.get('ignore_queue'): return super(PubSubManager, self).disconnect( sid, namespace=namespace) - self._publish({'method': 'disconnect', 'sid': sid, - 'namespace': namespace or '/'}) + message = {'method': 'disconnect', 'sid': sid, + 'namespace': namespace or '/', 'host_id': self.host_id} + self._handle_disconnect(message) # handle in this host + self._publish(message) # notify other hosts def close_room(self, room, namespace=None): - self._publish({'method': 'close_room', 'room': room, - 'namespace': namespace or '/'}) + message = {'method': 'close_room', 'room': room, + 'namespace': namespace or '/', 'host_id': self.host_id} + self._handle_close_room(message) # handle in this host + self._publish(message) # notify other hosts def _publish(self, data): """Publish a message on the Socket.IO channel. @@ -116,11 +124,10 @@ class PubSubManager(BaseManager): *remote_callback) else: callback = None - super(PubSubManager, self).emit(message['event'], message['data'], - namespace=message.get('namespace'), - room=message.get('room'), - skip_sid=message.get('skip_sid'), - callback=callback) + super().emit(message['event'], message['data'], + namespace=message.get('namespace'), + room=message.get('room'), + skip_sid=message.get('skip_sid'), callback=callback) def _handle_callback(self, message): if self.host_id == message.get('host_id'): @@ -135,9 +142,12 @@ class PubSubManager(BaseManager): def _return_callback(self, host_id, sid, namespace, callback_id, *args): # When an event callback is received, the callback is returned back # to the sender, which is identified by the host_id - self._publish({'method': 'callback', 'host_id': host_id, - 'sid': sid, 'namespace': namespace, 'id': callback_id, - 'args': args}) + if host_id == self.host_id: + self.trigger_callback(sid, callback_id, args) + else: + self._publish({'method': 'callback', 'host_id': host_id, + 'sid': sid, 'namespace': namespace, + 'id': callback_id, 'args': args}) def _handle_disconnect(self, message): self.server.disconnect(sid=message.get('sid'), @@ -145,8 +155,8 @@ class PubSubManager(BaseManager): ignore_queue=True) def _handle_close_room(self, message): - super(PubSubManager, self).close_room( - room=message.get('room'), namespace=message.get('namespace')) + super().close_room(room=message.get('room'), + namespace=message.get('namespace')) def _thread(self): for message in self._listen(): @@ -165,17 +175,18 @@ class PubSubManager(BaseManager): except: pass if data and 'method' in data: - self._get_logger().info('pubsub message: {}'.format( + self._get_logger().debug('pubsub message: {}'.format( data['method'])) try: - if data['method'] == 'emit': - self._handle_emit(data) - elif data['method'] == 'callback': + if data['method'] == 'callback': self._handle_callback(data) - elif data['method'] == 'disconnect': - self._handle_disconnect(data) - elif data['method'] == 'close_room': - self._handle_close_room(data) + elif data.get('host_id') != self.host_id: + if data['method'] == 'emit': + self._handle_emit(data) + elif data['method'] == 'disconnect': + self._handle_disconnect(data) + elif data['method'] == 'close_room': + self._handle_close_room(data) except: self.server.logger.exception( 'Unknown error in pubsub listening thread') diff --git a/tests/asyncio/test_asyncio_pubsub_manager.py b/tests/asyncio/test_asyncio_pubsub_manager.py index f1a63eb..fa74f55 100644 --- a/tests/asyncio/test_asyncio_pubsub_manager.py +++ b/tests/asyncio/test_asyncio_pubsub_manager.py @@ -163,13 +163,15 @@ class TestAsyncPubSubManager(unittest.TestCase): assert _run(self.pm.can_disconnect(sid, '/')) is True _run(self.pm.can_disconnect(sid, '/foo')) self.pm._publish.mock.assert_called_once_with( - {'method': 'disconnect', 'sid': sid, 'namespace': '/foo'} + {'method': 'disconnect', 'sid': sid, 'namespace': '/foo', + 'host_id': '123456'} ) def test_disconnect(self): _run(self.pm.disconnect('foo', '/')) self.pm._publish.mock.assert_called_once_with( - {'method': 'disconnect', 'sid': 'foo', 'namespace': '/'} + {'method': 'disconnect', 'sid': 'foo', 'namespace': '/', + 'host_id': '123456'} ) def test_disconnect_ignore_queue(self): @@ -182,13 +184,15 @@ class TestAsyncPubSubManager(unittest.TestCase): def test_close_room(self): _run(self.pm.close_room('foo')) self.pm._publish.mock.assert_called_once_with( - {'method': 'close_room', 'room': 'foo', 'namespace': '/'} + {'method': 'close_room', 'room': 'foo', 'namespace': '/', + 'host_id': '123456'} ) def test_close_room_with_namespace(self): _run(self.pm.close_room('foo', '/bar')) self.pm._publish.mock.assert_called_once_with( - {'method': 'close_room', 'room': 'foo', 'namespace': '/bar'} + {'method': 'close_room', 'room': 'foo', 'namespace': '/bar', + 'host_id': '123456'} ) def test_handle_emit(self): @@ -263,8 +267,7 @@ class TestAsyncPubSubManager(unittest.TestCase): callback=None, ) - def test_handle_emit_with_callback(self): - host_id = self.pm.host_id + def test_handle_emit_with_remote_callback(self): with mock.patch.object( asyncio_manager.AsyncManager, 'emit', new=AsyncMock() ) as super_emit: @@ -275,7 +278,7 @@ class TestAsyncPubSubManager(unittest.TestCase): 'data': 'bar', 'namespace': '/baz', 'callback': ('sid', '/baz', 123), - 'host_id': '123456', + 'host_id': 'x', } ) ) @@ -291,7 +294,7 @@ class TestAsyncPubSubManager(unittest.TestCase): self.pm._publish.mock.assert_called_once_with( { 'method': 'callback', - 'host_id': host_id, + 'host_id': 'x', 'sid': 'sid', 'namespace': '/baz', 'id': 123, @@ -299,6 +302,32 @@ class TestAsyncPubSubManager(unittest.TestCase): } ) + def test_handle_emit_with_local_callback(self): + with mock.patch.object( + asyncio_manager.AsyncManager, 'emit', new=AsyncMock() + ) as super_emit: + _run( + self.pm._handle_emit( + { + 'event': 'foo', + 'data': 'bar', + 'namespace': '/baz', + 'callback': ('sid', '/baz', 123), + 'host_id': self.pm.host_id, + } + ) + ) + assert super_emit.mock.call_count == 1 + assert super_emit.mock.call_args[0] == (self.pm, 'foo', 'bar') + assert super_emit.mock.call_args[1]['namespace'] == '/baz' + assert super_emit.mock.call_args[1]['room'] is None + assert super_emit.mock.call_args[1]['skip_sid'] is None + assert isinstance( + super_emit.mock.call_args[1]['callback'], functools.partial + ) + _run(super_emit.mock.call_args[1]['callback']('one', 2, 'three')) + self.pm._publish.mock.assert_not_called() + def test_handle_callback(self): host_id = self.pm.host_id with mock.patch.object( @@ -419,34 +448,50 @@ class TestAsyncPubSubManager(unittest.TestCase): self.pm._handle_callback = AsyncMock() self.pm._handle_disconnect = AsyncMock() self.pm._handle_close_room = AsyncMock() + host_id = self.pm.host_id async def messages(): import pickle - yield {'method': 'emit', 'value': 'foo'} - yield {'missing': 'method'} - yield '{"method": "callback", "value": "bar"}' - yield {'method': 'disconnect', 'sid': '123', 'namespace': '/foo'} - yield {'method': 'bogus'} - yield pickle.dumps({'method': 'close_room', 'value': 'baz'}) + yield {'method': 'emit', 'value': 'foo', 'host_id': 'x'} + yield {'missing': 'method', 'host_id': 'x'} + yield '{"method": "callback", "value": "bar", "host_id": "x"}' + yield {'method': 'disconnect', 'sid': '123', 'namespace': '/foo', + 'host_id': 'x'} + yield {'method': 'bogus', 'host_id': 'x'} + yield pickle.dumps({'method': 'close_room', 'value': 'baz', + 'host_id': 'x'}) yield 'bad json' yield b'bad pickled' + + # these should not publish anything on the queue, as they come from + # the same host + yield {'method': 'emit', 'value': 'foo', 'host_id': host_id} + yield {'method': 'callback', 'value': 'bar', 'host_id': host_id} + yield {'method': 'disconnect', 'sid': '123', 'namespace': '/foo', + 'host_id': host_id} + yield pickle.dumps({'method': 'close_room', 'value': 'baz', + 'host_id': host_id}) raise asyncio.CancelledError() # force the thread to exit self.pm._listen = messages _run(self.pm._thread()) self.pm._handle_emit.mock.assert_called_once_with( - {'method': 'emit', 'value': 'foo'} + {'method': 'emit', 'value': 'foo', 'host_id': 'x'} + ) + self.pm._handle_callback.mock.assert_any_call( + {'method': 'callback', 'value': 'bar', 'host_id': 'x'} ) - self.pm._handle_callback.mock.assert_called_once_with( - {'method': 'callback', 'value': 'bar'} + self.pm._handle_callback.mock.assert_any_call( + {'method': 'callback', 'value': 'bar', 'host_id': host_id} ) self.pm._handle_disconnect.mock.assert_called_once_with( - {'method': 'disconnect', 'sid': '123', 'namespace': '/foo'} + {'method': 'disconnect', 'sid': '123', 'namespace': '/foo', + 'host_id': 'x'} ) self.pm._handle_close_room.mock.assert_called_once_with( - {'method': 'close_room', 'value': 'baz'} + {'method': 'close_room', 'value': 'baz', 'host_id': 'x'} ) def test_background_thread_exception(self): @@ -454,15 +499,15 @@ class TestAsyncPubSubManager(unittest.TestCase): asyncio.CancelledError]) async def messages(): - yield {'method': 'emit', 'value': 'foo'} - yield {'method': 'emit', 'value': 'bar'} + yield {'method': 'emit', 'value': 'foo', 'host_id': 'x'} + yield {'method': 'emit', 'value': 'bar', 'host_id': 'x'} self.pm._listen = messages _run(self.pm._thread()) self.pm._handle_emit.mock.assert_any_call( - {'method': 'emit', 'value': 'foo'} + {'method': 'emit', 'value': 'foo', 'host_id': 'x'} ) self.pm._handle_emit.mock.assert_called_with( - {'method': 'emit', 'value': 'bar'} + {'method': 'emit', 'value': 'bar', 'host_id': 'x'} ) diff --git a/tests/common/test_pubsub_manager.py b/tests/common/test_pubsub_manager.py index 51e1f92..269c7c1 100644 --- a/tests/common/test_pubsub_manager.py +++ b/tests/common/test_pubsub_manager.py @@ -169,13 +169,15 @@ class TestPubSubManager(unittest.TestCase): assert self.pm.can_disconnect(sid, '/') self.pm.can_disconnect(sid, '/foo') self.pm._publish.assert_called_once_with( - {'method': 'disconnect', 'sid': sid, 'namespace': '/foo'} + {'method': 'disconnect', 'sid': sid, 'namespace': '/foo', + 'host_id': '123456'} ) def test_disconnect(self): self.pm.disconnect('foo') self.pm._publish.assert_called_once_with( - {'method': 'disconnect', 'sid': 'foo', 'namespace': '/'} + {'method': 'disconnect', 'sid': 'foo', 'namespace': '/', + 'host_id': '123456'} ) def test_disconnect_ignore_queue(self): @@ -188,13 +190,15 @@ class TestPubSubManager(unittest.TestCase): def test_close_room(self): self.pm.close_room('foo') self.pm._publish.assert_called_once_with( - {'method': 'close_room', 'room': 'foo', 'namespace': '/'} + {'method': 'close_room', 'room': 'foo', 'namespace': '/', + 'host_id': '123456'} ) def test_close_room_with_namespace(self): self.pm.close_room('foo', '/bar') self.pm._publish.assert_called_once_with( - {'method': 'close_room', 'room': 'foo', 'namespace': '/bar'} + {'method': 'close_room', 'room': 'foo', 'namespace': '/bar', + 'host_id': '123456'} ) def test_handle_emit(self): @@ -251,8 +255,7 @@ class TestPubSubManager(unittest.TestCase): callback=None, ) - def test_handle_emit_with_callback(self): - host_id = self.pm.host_id + def test_handle_emit_with_remote_callback(self): with mock.patch.object(base_manager.BaseManager, 'emit') as super_emit: self.pm._handle_emit( { @@ -260,7 +263,7 @@ class TestPubSubManager(unittest.TestCase): 'data': 'bar', 'namespace': '/baz', 'callback': ('sid', '/baz', 123), - 'host_id': host_id, + 'host_id': 'x', } ) assert super_emit.call_count == 1 @@ -275,7 +278,7 @@ class TestPubSubManager(unittest.TestCase): self.pm._publish.assert_called_once_with( { 'method': 'callback', - 'host_id': host_id, + 'host_id': 'x', 'sid': 'sid', 'namespace': '/baz', 'id': 123, @@ -283,6 +286,28 @@ class TestPubSubManager(unittest.TestCase): } ) + def test_handle_emit_with_local_callback(self): + with mock.patch.object(base_manager.BaseManager, 'emit') as super_emit: + self.pm._handle_emit( + { + 'event': 'foo', + 'data': 'bar', + 'namespace': '/baz', + 'callback': ('sid', '/baz', 123), + 'host_id': self.pm.host_id, + } + ) + assert super_emit.call_count == 1 + assert super_emit.call_args[0] == ('foo', 'bar') + assert super_emit.call_args[1]['namespace'] == '/baz' + assert super_emit.call_args[1]['room'] is None + assert super_emit.call_args[1]['skip_sid'] is None + assert isinstance( + super_emit.call_args[1]['callback'], functools.partial + ) + super_emit.call_args[1]['callback']('one', 2, 'three') + self.pm._publish.assert_not_called() + def test_handle_callback(self): host_id = self.pm.host_id with mock.patch.object(self.pm, 'trigger_callback') as trigger: @@ -373,19 +398,31 @@ class TestPubSubManager(unittest.TestCase): self.pm._handle_callback = mock.MagicMock() self.pm._handle_disconnect = mock.MagicMock() self.pm._handle_close_room = mock.MagicMock() + host_id = self.pm.host_id def messages(): import pickle - yield {'method': 'emit', 'value': 'foo'} - yield {'missing': 'method'} - yield '{"method": "callback", "value": "bar"}' - yield {'method': 'disconnect', 'sid': '123', 'namespace': '/foo'} - yield {'method': 'bogus'} - yield pickle.dumps({'method': 'close_room', 'value': 'baz'}) + yield {'method': 'emit', 'value': 'foo', 'host_id': 'x'} + yield {'missing': 'method', 'host_id': 'x'} + yield '{"method": "callback", "value": "bar", "host_id": "x"}' + yield {'method': 'disconnect', 'sid': '123', 'namespace': '/foo', + 'host_id': 'x'} + yield {'method': 'bogus', 'host_id': 'x'} + yield pickle.dumps({'method': 'close_room', 'value': 'baz', + 'host_id': 'x'}) yield 'bad json' yield b'bad pickled' + # these should not publish anything on the queue, as they come from + # the same host + yield {'method': 'emit', 'value': 'foo', 'host_id': host_id} + yield {'method': 'callback', 'value': 'bar', 'host_id': host_id} + yield {'method': 'disconnect', 'sid': '123', 'namespace': '/foo', + 'host_id': host_id} + yield pickle.dumps({'method': 'close_room', 'value': 'baz', + 'host_id': host_id}) + self.pm._listen = mock.MagicMock(side_effect=messages) try: self.pm._thread() @@ -393,24 +430,28 @@ class TestPubSubManager(unittest.TestCase): pass self.pm._handle_emit.assert_called_once_with( - {'method': 'emit', 'value': 'foo'} + {'method': 'emit', 'value': 'foo', 'host_id': 'x'} + ) + self.pm._handle_callback.assert_any_call( + {'method': 'callback', 'value': 'bar', 'host_id': 'x'} ) - self.pm._handle_callback.assert_called_once_with( - {'method': 'callback', 'value': 'bar'} + self.pm._handle_callback.assert_any_call( + {'method': 'callback', 'value': 'bar', 'host_id': host_id} ) self.pm._handle_disconnect.assert_called_once_with( - {'method': 'disconnect', 'sid': '123', 'namespace': '/foo'} + {'method': 'disconnect', 'sid': '123', 'namespace': '/foo', + 'host_id': 'x'} ) self.pm._handle_close_room.assert_called_once_with( - {'method': 'close_room', 'value': 'baz'} + {'method': 'close_room', 'value': 'baz', 'host_id': 'x'} ) def test_background_thread_exception(self): self.pm._handle_emit = mock.MagicMock(side_effect=[ValueError(), None]) def messages(): - yield {'method': 'emit', 'value': 'foo'} - yield {'method': 'emit', 'value': 'bar'} + yield {'method': 'emit', 'value': 'foo', 'host_id': 'x'} + yield {'method': 'emit', 'value': 'bar', 'host_id': 'x'} self.pm._listen = mock.MagicMock(side_effect=messages) try: @@ -419,8 +460,8 @@ class TestPubSubManager(unittest.TestCase): pass self.pm._handle_emit.assert_any_call( - {'method': 'emit', 'value': 'foo'} + {'method': 'emit', 'value': 'foo', 'host_id': 'x'} ) self.pm._handle_emit.assert_called_with( - {'method': 'emit', 'value': 'bar'} + {'method': 'emit', 'value': 'bar', 'host_id': 'x'} )