Browse Source

Support client disconnects with multiple servers (Fixes https://github.com/miguelgrinberg/Flask-SocketIO/issues/1174)

pull/449/head
Miguel Grinberg 5 years ago
parent
commit
01378ef1ef
  1. 3
      socketio/asyncio_manager.py
  2. 22
      socketio/asyncio_pubsub_manager.py
  3. 10
      socketio/asyncio_server.py
  4. 3
      socketio/base_manager.py
  5. 22
      socketio/pubsub_manager.py
  6. 10
      socketio/server.py
  7. 18
      tests/asyncio/test_asyncio_pubsub_manager.py
  8. 1
      tests/asyncio/test_asyncio_server.py
  9. 18
      tests/common/test_pubsub_manager.py

3
socketio/asyncio_manager.py

@ -5,6 +5,9 @@ from .base_manager import BaseManager
class AsyncManager(BaseManager):
"""Manage a client list for an asyncio server."""
async def can_disconnect(self, sid, namespace):
return self.is_connected(sid, namespace)
async def emit(self, event, data, namespace, room=None, skip_sid=None,
callback=None, **kwargs):
"""Emit a message to a single client, a room, or all the clients

22
socketio/asyncio_pubsub_manager.py

@ -69,6 +69,19 @@ class AsyncPubSubManager(AsyncManager):
'skip_sid': skip_sid, 'callback': callback,
'host_id': self.host_id})
async def can_disconnect(self, sid, namespace):
await self._publish({'method': 'disconnect', 'sid': sid,
'namespace': namespace or '/'})
async def disconnect(self, sid, namespace=None):
"""Disconnect a client."""
# this is a bit weird, the can_disconnect call on pubsub managers just
# issues a disconnect request to the message queue and returns None,
# indicating that the client cannot disconnect immediately. The
# server(s) listening on the queue will get this request and carry out
# the disconnect appropriately.
await self.can_disconnect(sid, namespace)
async def close_room(self, room, namespace=None):
await self._publish({'method': 'close_room', 'room': room,
'namespace': namespace or '/'})
@ -128,6 +141,11 @@ class AsyncPubSubManager(AsyncManager):
'sid': sid, 'namespace': namespace,
'id': callback_id, 'args': args})
async def _handle_disconnect(self, message):
await self.server.disconnect(sid=message.get('sid'),
namespace=message.get('namespace'),
ignore_queue=True)
async def _handle_close_room(self, message):
await super().close_room(
room=message.get('room'), namespace=message.get('namespace'))
@ -155,9 +173,13 @@ class AsyncPubSubManager(AsyncManager):
except:
pass
if data and 'method' in data:
self._get_logger().info('pubsub message: {}'.format(
data['method']))
if data['method'] == 'emit':
await self._handle_emit(data)
elif data['method'] == 'callback':
await self._handle_callback(data)
elif data['method'] == 'disconnect':
await self._handle_disconnect(data)
elif data['method'] == 'close_room':
await self._handle_close_room(data)

10
socketio/asyncio_server.py

@ -297,17 +297,23 @@ class AsyncServer(server.Server):
return _session_context_manager(self, sid, namespace)
async def disconnect(self, sid, namespace=None):
async def disconnect(self, sid, namespace=None, ignore_queue=False):
"""Disconnect a client.
:param sid: Session ID of the client.
:param namespace: The Socket.IO namespace to disconnect. If this
argument is omitted the default namespace is used.
:param ignore_queue: Only used when a message queue is configured. If
set to ``True``, the disconnect is processed
locally, without broadcasting on the queue. It is
recommended to always leave this parameter with
its default value of ``False``.
Note: this method is a coroutine.
"""
namespace = namespace or '/'
if self.manager.is_connected(sid, namespace=namespace):
if (ignore_queue and self.manager.is_connected(sid, namespace)) or \
await self.manager.can_disconnect(sid, namespace):
self.logger.info('Disconnecting %s [%s]', sid, namespace)
self.manager.pre_disconnect(sid, namespace=namespace)
await self._send_packet(sid, packet.Packet(packet.DISCONNECT,

3
socketio/base_manager.py

@ -55,6 +55,9 @@ class BaseManager(object):
except KeyError:
pass
def can_disconnect(self, sid, namespace):
return self.is_connected(sid, namespace)
def pre_disconnect(self, sid, namespace):
"""Put the client in the to-be-disconnected list.

22
socketio/pubsub_manager.py

@ -67,6 +67,19 @@ class PubSubManager(BaseManager):
'skip_sid': skip_sid, 'callback': callback,
'host_id': self.host_id})
def can_disconnect(self, sid, namespace):
self._publish({'method': 'disconnect', 'sid': sid,
'namespace': namespace or '/'})
def disconnect(self, sid, namespace=None):
"""Disconnect a client."""
# this is a bit weird, the can_disconnect call on pubsub managers just
# issues a disconnect request to the message queue and returns None,
# indicating that the client cannot disconnect immediately. The
# server(s) listening on the queue will get this request and carry out
# the disconnect appropriately.
self.can_disconnect(sid, namespace)
def close_room(self, room, namespace=None):
self._publish({'method': 'close_room', 'room': room,
'namespace': namespace or '/'})
@ -125,6 +138,11 @@ class PubSubManager(BaseManager):
'sid': sid, 'namespace': namespace, 'id': callback_id,
'args': args})
def _handle_disconnect(self, message):
self.server.disconnect(sid=message.get('sid'),
namespace=message.get('namespace'),
ignore_queue=True)
def _handle_close_room(self, message):
super(PubSubManager, self).close_room(
room=message.get('room'), namespace=message.get('namespace'))
@ -146,9 +164,13 @@ class PubSubManager(BaseManager):
except:
pass
if data and 'method' in data:
self._get_logger().info('pubsub message: {}'.format(
data['method']))
if data['method'] == 'emit':
self._handle_emit(data)
elif data['method'] == 'callback':
self._handle_callback(data)
elif data['method'] == 'disconnect':
self._handle_disconnect(data)
elif data['method'] == 'close_room':
self._handle_close_room(data)

10
socketio/server.py

@ -492,15 +492,21 @@ class Server(object):
return _session_context_manager(self, sid, namespace)
def disconnect(self, sid, namespace=None):
def disconnect(self, sid, namespace=None, ignore_queue=False):
"""Disconnect a client.
:param sid: Session ID of the client.
:param namespace: The Socket.IO namespace to disconnect. If this
argument is omitted the default namespace is used.
:param ignore_queue: Only used when a message queue is configured. If
set to ``True``, the disconnect is processed
locally, without broadcasting on the queue. It is
recommended to always leave this parameter with
its default value of ``False``.
"""
namespace = namespace or '/'
if self.manager.is_connected(sid, namespace=namespace):
if (ignore_queue and self.manager.is_connected(sid, namespace)) or \
self.manager.can_disconnect(sid, namespace):
self.logger.info('Disconnecting %s [%s]', sid, namespace)
self.manager.pre_disconnect(sid, namespace=namespace)
self._send_packet(sid, packet.Packet(packet.DISCONNECT,

18
tests/asyncio/test_asyncio_pubsub_manager.py

@ -34,6 +34,7 @@ class TestAsyncPubSubManager(unittest.TestCase):
def setUp(self):
mock_server = mock.MagicMock()
mock_server._emit_internal = AsyncMock()
mock_server.disconnect = AsyncMock()
self.pm = asyncio_pubsub_manager.AsyncPubSubManager()
self.pm._publish = AsyncMock()
self.pm.set_server(mock_server)
@ -115,6 +116,11 @@ class TestAsyncPubSubManager(unittest.TestCase):
self.pm.server._emit_internal.mock.assert_called_once_with(
'123', 'foo', 'bar', '/', None)
def test_disconnect(self):
_run(self.pm.disconnect('123', '/foo'))
self.pm._publish.mock.assert_called_once_with(
{'method': 'disconnect', 'sid': '123', 'namespace': '/foo'})
def test_close_room(self):
_run(self.pm.close_room('foo'))
self.pm._publish.mock.assert_called_once_with(
@ -142,7 +148,7 @@ class TestAsyncPubSubManager(unittest.TestCase):
self.pm, 'foo', 'bar', namespace='/baz', room=None,
skip_sid=None, callback=None)
def test_handle_emiti_with_room(self):
def test_handle_emit_with_room(self):
with mock.patch.object(asyncio_manager.AsyncManager, 'emit',
new=AsyncMock()) as super_emit:
_run(self.pm._handle_emit({'event': 'foo', 'data': 'bar',
@ -216,6 +222,12 @@ class TestAsyncPubSubManager(unittest.TestCase):
'host_id': host_id}))
self.assertEqual(trigger.mock.call_count, 0)
def test_handle_disconnect(self):
_run(self.pm._handle_disconnect({'method': 'disconnect', 'sid': '123',
'namespace': '/foo'}))
self.pm.server.disconnect.mock.assert_called_once_with(
sid='123', namespace='/foo', ignore_queue=True)
def test_handle_close_room(self):
with mock.patch.object(asyncio_manager.AsyncManager, 'close_room',
new=AsyncMock()) as super_close_room:
@ -236,6 +248,7 @@ class TestAsyncPubSubManager(unittest.TestCase):
def test_background_thread(self):
self.pm._handle_emit = AsyncMock()
self.pm._handle_callback = AsyncMock()
self.pm._handle_disconnect = AsyncMock()
self.pm._handle_close_room = AsyncMock()
def messages():
@ -243,6 +256,7 @@ class TestAsyncPubSubManager(unittest.TestCase):
yield {'method': 'emit', 'value': 'foo'}
yield {'missing': 'method'}
yield '{"method": "callback", "value": "bar"}'
yield {'method': 'disconnect', 'sid': '123', 'namespace': '/foo'}
yield {'method': 'bogus'}
yield pickle.dumps({'method': 'close_room', 'value': 'baz'})
yield 'bad json'
@ -258,5 +272,7 @@ class TestAsyncPubSubManager(unittest.TestCase):
{'method': 'emit', 'value': 'foo'})
self.pm._handle_callback.mock.assert_called_once_with(
{'method': 'callback', 'value': 'bar'})
self.pm._handle_disconnect.mock.assert_called_once_with(
{'method': 'disconnect', 'sid': '123', 'namespace': '/foo'})
self.pm._handle_close_room.mock.assert_called_once_with(
{'method': 'close_room', 'value': 'baz'})

1
tests/asyncio/test_asyncio_server.py

@ -42,6 +42,7 @@ class TestAsyncServer(unittest.TestCase):
def _get_mock_manager(self):
mgr = mock.MagicMock()
mgr.can_disconnect = AsyncMock()
mgr.emit = AsyncMock()
mgr.close_room = AsyncMock()
mgr.trigger_callback = AsyncMock()

18
tests/common/test_pubsub_manager.py

@ -112,6 +112,11 @@ class TestBaseManager(unittest.TestCase):
self.pm.server._emit_internal.assert_called_once_with('123', 'foo',
'bar', '/', None)
def test_disconnect(self):
self.pm.disconnect('123', '/foo')
self.pm._publish.assert_called_once_with(
{'method': 'disconnect', 'sid': '123', 'namespace': '/foo'})
def test_close_room(self):
self.pm.close_room('foo')
self.pm._publish.assert_called_once_with(
@ -137,7 +142,7 @@ class TestBaseManager(unittest.TestCase):
room=None, skip_sid=None,
callback=None)
def test_handle_emiti_with_room(self):
def test_handle_emit_with_room(self):
with mock.patch.object(base_manager.BaseManager, 'emit') as super_emit:
self.pm._handle_emit({'event': 'foo', 'data': 'bar',
'room': 'baz'})
@ -204,6 +209,13 @@ class TestBaseManager(unittest.TestCase):
'host_id': host_id})
self.assertEqual(trigger.call_count, 0)
def test_handle_disconnect(self):
self.pm._handle_disconnect({'method': 'disconnect', 'sid': '123',
'namespace': '/foo'})
self.pm.server.disconnect.assert_called_once_with(sid='123',
namespace='/foo',
ignore_queue=True)
def test_handle_close_room(self):
with mock.patch.object(base_manager.BaseManager, 'close_room') \
as super_close_room:
@ -223,6 +235,7 @@ class TestBaseManager(unittest.TestCase):
def test_background_thread(self):
self.pm._handle_emit = mock.MagicMock()
self.pm._handle_callback = mock.MagicMock()
self.pm._handle_disconnect = mock.MagicMock()
self.pm._handle_close_room = mock.MagicMock()
def messages():
@ -230,6 +243,7 @@ class TestBaseManager(unittest.TestCase):
yield {'method': 'emit', 'value': 'foo'}
yield {'missing': 'method'}
yield '{"method": "callback", "value": "bar"}'
yield {'method': 'disconnect', 'sid': '123', 'namespace': '/foo'}
yield {'method': 'bogus'}
yield pickle.dumps({'method': 'close_room', 'value': 'baz'})
yield 'bad json'
@ -245,5 +259,7 @@ class TestBaseManager(unittest.TestCase):
{'method': 'emit', 'value': 'foo'})
self.pm._handle_callback.assert_called_once_with(
{'method': 'callback', 'value': 'bar'})
self.pm._handle_disconnect.assert_called_once_with(
{'method': 'disconnect', 'sid': '123', 'namespace': '/foo'})
self.pm._handle_close_room.assert_called_once_with(
{'method': 'close_room', 'value': 'baz'})

Loading…
Cancel
Save