Browse Source
Catch and log errors in pubsub listening thread (Fixes #889)
pull/892/head
Miguel Grinberg
3 years ago
No known key found for this signature in database
GPG Key ID: 36848B262DF5F06C
4 changed files with
64 additions and
16 deletions
-
src/socketio/asyncio_pubsub_manager.py
-
src/socketio/pubsub_manager.py
-
tests/asyncio/test_asyncio_pubsub_manager.py
-
tests/common/test_pubsub_manager.py
|
|
@ -166,6 +166,7 @@ class AsyncPubSubManager(AsyncManager): |
|
|
|
if data and 'method' in data: |
|
|
|
self._get_logger().info('pubsub message: {}'.format( |
|
|
|
data['method'])) |
|
|
|
try: |
|
|
|
if data['method'] == 'emit': |
|
|
|
await self._handle_emit(data) |
|
|
|
elif data['method'] == 'callback': |
|
|
@ -174,6 +175,11 @@ class AsyncPubSubManager(AsyncManager): |
|
|
|
await self._handle_disconnect(data) |
|
|
|
elif data['method'] == 'close_room': |
|
|
|
await self._handle_close_room(data) |
|
|
|
except asyncio.CancelledError: |
|
|
|
raise # let the outer try/except handle it |
|
|
|
except: |
|
|
|
self.server.logger.exception( |
|
|
|
'Unknown error in pubsub listening task') |
|
|
|
except asyncio.CancelledError: # pragma: no cover |
|
|
|
break |
|
|
|
except: # pragma: no cover |
|
|
|
|
|
@ -164,6 +164,7 @@ class PubSubManager(BaseManager): |
|
|
|
if data and 'method' in data: |
|
|
|
self._get_logger().info('pubsub message: {}'.format( |
|
|
|
data['method'])) |
|
|
|
try: |
|
|
|
if data['method'] == 'emit': |
|
|
|
self._handle_emit(data) |
|
|
|
elif data['method'] == 'callback': |
|
|
@ -172,3 +173,6 @@ class PubSubManager(BaseManager): |
|
|
|
self._handle_disconnect(data) |
|
|
|
elif data['method'] == 'close_room': |
|
|
|
self._handle_close_room(data) |
|
|
|
except: |
|
|
|
self.server.logger.exception( |
|
|
|
'Unknown error in pubsub listening thread') |
|
|
|
|
|
@ -445,3 +445,21 @@ class TestAsyncPubSubManager(unittest.TestCase): |
|
|
|
self.pm._handle_close_room.mock.assert_called_once_with( |
|
|
|
{'method': 'close_room', 'value': 'baz'} |
|
|
|
) |
|
|
|
|
|
|
|
def test_background_thread_exception(self): |
|
|
|
self.pm._handle_emit = AsyncMock(side_effect=[ValueError(), |
|
|
|
asyncio.CancelledError]) |
|
|
|
|
|
|
|
async def messages(): |
|
|
|
yield {'method': 'emit', 'value': 'foo'} |
|
|
|
yield {'method': 'emit', 'value': 'bar'} |
|
|
|
|
|
|
|
self.pm._listen = messages |
|
|
|
_run(self.pm._thread()) |
|
|
|
|
|
|
|
self.pm._handle_emit.mock.assert_any_call( |
|
|
|
{'method': 'emit', 'value': 'foo'} |
|
|
|
) |
|
|
|
self.pm._handle_emit.mock.assert_called_with( |
|
|
|
{'method': 'emit', 'value': 'bar'} |
|
|
|
) |
|
|
|
|
|
@ -394,3 +394,23 @@ class TestPubSubManager(unittest.TestCase): |
|
|
|
self.pm._handle_close_room.assert_called_once_with( |
|
|
|
{'method': 'close_room', 'value': 'baz'} |
|
|
|
) |
|
|
|
|
|
|
|
def test_background_thread_exception(self): |
|
|
|
self.pm._handle_emit = mock.MagicMock(side_effect=[ValueError(), None]) |
|
|
|
|
|
|
|
def messages(): |
|
|
|
yield {'method': 'emit', 'value': 'foo'} |
|
|
|
yield {'method': 'emit', 'value': 'bar'} |
|
|
|
|
|
|
|
self.pm._listen = mock.MagicMock(side_effect=messages) |
|
|
|
try: |
|
|
|
self.pm._thread() |
|
|
|
except StopIteration: |
|
|
|
pass |
|
|
|
|
|
|
|
self.pm._handle_emit.assert_any_call( |
|
|
|
{'method': 'emit', 'value': 'foo'} |
|
|
|
) |
|
|
|
self.pm._handle_emit.assert_called_with( |
|
|
|
{'method': 'emit', 'value': 'bar'} |
|
|
|
) |
|
|
|