From d222f4c3deb7a12f3a8c584777dbfc18448a206d Mon Sep 17 00:00:00 2001 From: Miguel Grinberg Date: Tue, 19 Sep 2023 19:20:16 +0100 Subject: [PATCH] Support entering and leaving rooms through pubsub client managers --- src/socketio/asyncio_pubsub_manager.py | 35 +++++++++ src/socketio/pubsub_manager.py | 34 +++++++++ tests/asyncio/test_asyncio_pubsub_manager.py | 77 ++++++++++++++++++++ tests/common/test_pubsub_manager.py | 61 ++++++++++++++++ 4 files changed, 207 insertions(+) diff --git a/src/socketio/asyncio_pubsub_manager.py b/src/socketio/asyncio_pubsub_manager.py index ab0a93a..47d75b0 100644 --- a/src/socketio/asyncio_pubsub_manager.py +++ b/src/socketio/asyncio_pubsub_manager.py @@ -90,6 +90,25 @@ class AsyncPubSubManager(AsyncManager): await self._handle_disconnect(message) # handle in this host await self._publish(message) # notify other hosts + async def enter_room(self, sid, namespace, room, eio_sid=None): + if self.is_connected(sid, namespace): + # client is in this server, so we can disconnect directly + return await super().enter_room(sid, namespace, room, + eio_sid=eio_sid) + else: + message = {'method': 'enter_room', 'sid': sid, 'room': room, + 'namespace': namespace or '/', 'host_id': self.host_id} + await self._publish(message) # notify other hosts + + async def leave_room(self, sid, namespace, room): + if self.is_connected(sid, namespace): + # client is in this server, so we can disconnect directly + return await super().leave_room(sid, namespace, room) + else: + message = {'method': 'leave_room', 'sid': sid, 'room': room, + 'namespace': namespace or '/', 'host_id': self.host_id} + await self._publish(message) # notify other hosts + async def close_room(self, room, namespace=None): message = {'method': 'close_room', 'room': room, 'namespace': namespace or '/', 'host_id': self.host_id} @@ -158,6 +177,18 @@ class AsyncPubSubManager(AsyncManager): namespace=message.get('namespace'), ignore_queue=True) + async def _handle_enter_room(self, message): + sid = message.get('sid') + namespace = message.get('namespace') + if self.is_connected(sid, namespace): + await super().enter_room(sid, namespace, message.get('room')) + + async def _handle_leave_room(self, message): + sid = message.get('sid') + namespace = message.get('namespace') + if self.is_connected(sid, namespace): + await super().leave_room(sid, namespace, message.get('room')) + async def _handle_close_room(self, message): await super().close_room(room=message.get('room'), namespace=message.get('namespace')) @@ -191,6 +222,10 @@ class AsyncPubSubManager(AsyncManager): await self._handle_emit(data) elif data['method'] == 'disconnect': await self._handle_disconnect(data) + elif data['method'] == 'enter_room': + await self._handle_enter_room(data) + elif data['method'] == 'leave_room': + await self._handle_leave_room(data) elif data['method'] == 'close_room': await self._handle_close_room(data) except asyncio.CancelledError: diff --git a/src/socketio/pubsub_manager.py b/src/socketio/pubsub_manager.py index fa5eba4..8321e41 100644 --- a/src/socketio/pubsub_manager.py +++ b/src/socketio/pubsub_manager.py @@ -87,6 +87,24 @@ class PubSubManager(BaseManager): self._handle_disconnect(message) # handle in this host self._publish(message) # notify other hosts + def enter_room(self, sid, namespace, room, eio_sid=None): + if self.is_connected(sid, namespace): + # client is in this server, so we can add to the room directly + return super().enter_room(sid, namespace, room, eio_sid=eio_sid) + else: + message = {'method': 'enter_room', 'sid': sid, 'room': room, + 'namespace': namespace or '/', 'host_id': self.host_id} + self._publish(message) # notify other hosts + + def leave_room(self, sid, namespace, room): + if self.is_connected(sid, namespace): + # client is in this server, so we can remove from the room directly + return super().leave_room(sid, namespace, room) + else: + message = {'method': 'leave_room', 'sid': sid, 'room': room, + 'namespace': namespace or '/', 'host_id': self.host_id} + self._publish(message) # notify other hosts + def close_room(self, room, namespace=None): message = {'method': 'close_room', 'room': room, 'namespace': namespace or '/', 'host_id': self.host_id} @@ -153,6 +171,18 @@ class PubSubManager(BaseManager): namespace=message.get('namespace'), ignore_queue=True) + def _handle_enter_room(self, message): + sid = message.get('sid') + namespace = message.get('namespace') + if self.is_connected(sid, namespace): + super().enter_room(sid, namespace, message.get('room')) + + def _handle_leave_room(self, message): + sid = message.get('sid') + namespace = message.get('namespace') + if self.is_connected(sid, namespace): + super().leave_room(sid, namespace, message.get('room')) + def _handle_close_room(self, message): super().close_room(room=message.get('room'), namespace=message.get('namespace')) @@ -184,6 +214,10 @@ class PubSubManager(BaseManager): self._handle_emit(data) elif data['method'] == 'disconnect': self._handle_disconnect(data) + elif data['method'] == 'enter_room': + self._handle_enter_room(data) + elif data['method'] == 'leave_room': + self._handle_leave_room(data) elif data['method'] == 'close_room': self._handle_close_room(data) except: diff --git a/tests/asyncio/test_asyncio_pubsub_manager.py b/tests/asyncio/test_asyncio_pubsub_manager.py index fa74f55..e681e23 100644 --- a/tests/asyncio/test_asyncio_pubsub_manager.py +++ b/tests/asyncio/test_asyncio_pubsub_manager.py @@ -181,6 +181,27 @@ class TestAsyncPubSubManager(unittest.TestCase): self.pm._publish.mock.assert_not_called() assert self.pm.is_connected(sid, '/') is False + def test_enter_room(self): + sid = self.pm.connect('123', '/') + _run(self.pm.enter_room(sid, '/', 'foo')) + _run(self.pm.enter_room('456', '/', 'foo')) + assert sid in self.pm.rooms['/']['foo'] + assert self.pm.rooms['/']['foo'][sid] == '123' + self.pm._publish.mock.assert_called_once_with( + {'method': 'enter_room', 'sid': '456', 'room': 'foo', + 'namespace': '/', 'host_id': '123456'} + ) + + def test_leave_room(self): + sid = self.pm.connect('123', '/') + _run(self.pm.leave_room(sid, '/', 'foo')) + _run(self.pm.leave_room('456', '/', 'foo')) + assert 'foo' not in self.pm.rooms['/'] + self.pm._publish.mock.assert_called_once_with( + {'method': 'leave_room', 'sid': '456', 'room': 'foo', + 'namespace': '/', 'host_id': '123456'} + ) + def test_close_room(self): _run(self.pm.close_room('foo')) self.pm._publish.mock.assert_called_once_with( @@ -413,6 +434,48 @@ class TestAsyncPubSubManager(unittest.TestCase): sid='123', namespace='/foo', ignore_queue=True ) + def test_handle_enter_room(self): + sid = self.pm.connect('123', '/') + with mock.patch.object( + asyncio_manager.AsyncManager, 'enter_room', new=AsyncMock() + ) as super_enter_room: + _run( + self.pm._handle_enter_room( + {'method': 'enter_room', 'sid': sid, 'namespace': '/', + 'room': 'foo'} + ) + ) + _run( + self.pm._handle_enter_room( + {'method': 'enter_room', 'sid': '456', 'namespace': '/', + 'room': 'foo'} + ) + ) + super_enter_room.mock.assert_called_once_with( + self.pm, sid, '/', 'foo' + ) + + def test_handle_leave_room(self): + sid = self.pm.connect('123', '/') + with mock.patch.object( + asyncio_manager.AsyncManager, 'leave_room', new=AsyncMock() + ) as super_leave_room: + _run( + self.pm._handle_leave_room( + {'method': 'leave_room', 'sid': sid, 'namespace': '/', + 'room': 'foo'} + ) + ) + _run( + self.pm._handle_leave_room( + {'method': 'leave_room', 'sid': '456', 'namespace': '/', + 'room': 'foo'} + ) + ) + super_leave_room.mock.assert_called_once_with( + self.pm, sid, '/', 'foo' + ) + def test_handle_close_room(self): with mock.patch.object( asyncio_manager.AsyncManager, 'close_room', new=AsyncMock() @@ -447,6 +510,8 @@ class TestAsyncPubSubManager(unittest.TestCase): self.pm._handle_emit = AsyncMock() self.pm._handle_callback = AsyncMock() self.pm._handle_disconnect = AsyncMock() + self.pm._handle_enter_room = AsyncMock() + self.pm._handle_leave_room = AsyncMock() self.pm._handle_close_room = AsyncMock() host_id = self.pm.host_id @@ -461,6 +526,10 @@ class TestAsyncPubSubManager(unittest.TestCase): yield {'method': 'bogus', 'host_id': 'x'} yield pickle.dumps({'method': 'close_room', 'value': 'baz', 'host_id': 'x'}) + yield {'method': 'enter_room', 'sid': '123', 'namespace': '/foo', + 'room': 'room', 'host_id': 'x'} + yield {'method': 'leave_room', 'sid': '123', 'namespace': '/foo', + 'room': 'room', 'host_id': 'x'} yield 'bad json' yield b'bad pickled' @@ -490,6 +559,14 @@ class TestAsyncPubSubManager(unittest.TestCase): {'method': 'disconnect', 'sid': '123', 'namespace': '/foo', 'host_id': 'x'} ) + self.pm._handle_enter_room.mock.assert_called_once_with( + {'method': 'enter_room', 'sid': '123', 'namespace': '/foo', + 'room': 'room', 'host_id': 'x'} + ) + self.pm._handle_leave_room.mock.assert_called_once_with( + {'method': 'leave_room', 'sid': '123', 'namespace': '/foo', + 'room': 'room', 'host_id': 'x'} + ) self.pm._handle_close_room.mock.assert_called_once_with( {'method': 'close_room', 'value': 'baz', 'host_id': 'x'} ) diff --git a/tests/common/test_pubsub_manager.py b/tests/common/test_pubsub_manager.py index 269c7c1..8f92f7d 100644 --- a/tests/common/test_pubsub_manager.py +++ b/tests/common/test_pubsub_manager.py @@ -187,6 +187,27 @@ class TestPubSubManager(unittest.TestCase): self.pm._publish.assert_not_called() assert not self.pm.is_connected(sid, '/') + def test_enter_room(self): + sid = self.pm.connect('123', '/') + self.pm.enter_room(sid, '/', 'foo') + self.pm.enter_room('456', '/', 'foo') + assert sid in self.pm.rooms['/']['foo'] + assert self.pm.rooms['/']['foo'][sid] == '123' + self.pm._publish.assert_called_once_with( + {'method': 'enter_room', 'sid': '456', 'room': 'foo', + 'namespace': '/', 'host_id': '123456'} + ) + + def test_leave_room(self): + sid = self.pm.connect('123', '/') + self.pm.leave_room(sid, '/', 'foo') + self.pm.leave_room('456', '/', 'foo') + assert 'foo' not in self.pm.rooms['/'] + self.pm._publish.assert_called_once_with( + {'method': 'leave_room', 'sid': '456', 'room': 'foo', + 'namespace': '/', 'host_id': '123456'} + ) + def test_close_room(self): self.pm.close_room('foo') self.pm._publish.assert_called_once_with( @@ -373,6 +394,32 @@ class TestPubSubManager(unittest.TestCase): sid='123', namespace='/foo', ignore_queue=True ) + def test_handle_enter_room(self): + sid = self.pm.connect('123', '/') + with mock.patch.object( + base_manager.BaseManager, 'enter_room' + ) as super_enter_room: + self.pm._handle_enter_room({ + 'method': 'enter_room', 'sid': sid, 'namespace': '/', + 'room': 'foo'}) + self.pm._handle_enter_room({ + 'method': 'enter_room', 'sid': '456', 'namespace': '/', + 'room': 'foo'}) + super_enter_room.assert_called_once_with(sid, '/', 'foo') + + def test_handle_leave_room(self): + sid = self.pm.connect('123', '/') + with mock.patch.object( + base_manager.BaseManager, 'leave_room' + ) as super_leave_room: + self.pm._handle_leave_room({ + 'method': 'leave_room', 'sid': sid, 'namespace': '/', + 'room': 'foo'}) + self.pm._handle_leave_room({ + 'method': 'leave_room', 'sid': '456', 'namespace': '/', + 'room': 'foo'}) + super_leave_room.assert_called_once_with(sid, '/', 'foo') + def test_handle_close_room(self): with mock.patch.object( base_manager.BaseManager, 'close_room' @@ -397,6 +444,8 @@ class TestPubSubManager(unittest.TestCase): self.pm._handle_emit = mock.MagicMock() self.pm._handle_callback = mock.MagicMock() self.pm._handle_disconnect = mock.MagicMock() + self.pm._handle_enter_room = mock.MagicMock() + self.pm._handle_leave_room = mock.MagicMock() self.pm._handle_close_room = mock.MagicMock() host_id = self.pm.host_id @@ -411,6 +460,10 @@ class TestPubSubManager(unittest.TestCase): yield {'method': 'bogus', 'host_id': 'x'} yield pickle.dumps({'method': 'close_room', 'value': 'baz', 'host_id': 'x'}) + yield {'method': 'enter_room', 'sid': '123', 'namespace': '/foo', + 'room': 'room', 'host_id': 'x'} + yield {'method': 'leave_room', 'sid': '123', 'namespace': '/foo', + 'room': 'room', 'host_id': 'x'} yield 'bad json' yield b'bad pickled' @@ -442,6 +495,14 @@ class TestPubSubManager(unittest.TestCase): {'method': 'disconnect', 'sid': '123', 'namespace': '/foo', 'host_id': 'x'} ) + self.pm._handle_enter_room.assert_called_once_with( + {'method': 'enter_room', 'sid': '123', 'namespace': '/foo', + 'room': 'room', 'host_id': 'x'} + ) + self.pm._handle_leave_room.assert_called_once_with( + {'method': 'leave_room', 'sid': '123', 'namespace': '/foo', + 'room': 'room', 'host_id': 'x'} + ) self.pm._handle_close_room.assert_called_once_with( {'method': 'close_room', 'value': 'baz', 'host_id': 'x'} )