diff --git a/src/socketio/asyncio_pubsub_manager.py b/src/socketio/asyncio_pubsub_manager.py index ff37f2d..ac261a1 100644 --- a/src/socketio/asyncio_pubsub_manager.py +++ b/src/socketio/asyncio_pubsub_manager.py @@ -166,14 +166,20 @@ class AsyncPubSubManager(AsyncManager): if data and 'method' in data: self._get_logger().info('pubsub message: {}'.format( data['method'])) - if data['method'] == 'emit': - await self._handle_emit(data) - elif data['method'] == 'callback': - await self._handle_callback(data) - elif data['method'] == 'disconnect': - await self._handle_disconnect(data) - elif data['method'] == 'close_room': - await self._handle_close_room(data) + try: + if data['method'] == 'emit': + await self._handle_emit(data) + elif data['method'] == 'callback': + await self._handle_callback(data) + elif data['method'] == 'disconnect': + await self._handle_disconnect(data) + elif data['method'] == 'close_room': + await self._handle_close_room(data) + except asyncio.CancelledError: + raise # let the outer try/except handle it + except: + self.server.logger.exception( + 'Unknown error in pubsub listening task') except asyncio.CancelledError: # pragma: no cover break except: # pragma: no cover diff --git a/src/socketio/pubsub_manager.py b/src/socketio/pubsub_manager.py index 9b6f36d..2b619b8 100644 --- a/src/socketio/pubsub_manager.py +++ b/src/socketio/pubsub_manager.py @@ -164,11 +164,15 @@ class PubSubManager(BaseManager): if data and 'method' in data: self._get_logger().info('pubsub message: {}'.format( data['method'])) - if data['method'] == 'emit': - self._handle_emit(data) - elif data['method'] == 'callback': - self._handle_callback(data) - elif data['method'] == 'disconnect': - self._handle_disconnect(data) - elif data['method'] == 'close_room': - self._handle_close_room(data) + try: + if data['method'] == 'emit': + self._handle_emit(data) + elif data['method'] == 'callback': + self._handle_callback(data) + elif data['method'] == 'disconnect': + self._handle_disconnect(data) + elif data['method'] == 'close_room': + self._handle_close_room(data) + except: + self.server.logger.exception( + 'Unknown error in pubsub listening thread') diff --git a/tests/asyncio/test_asyncio_pubsub_manager.py b/tests/asyncio/test_asyncio_pubsub_manager.py index 48480a5..c95c073 100644 --- a/tests/asyncio/test_asyncio_pubsub_manager.py +++ b/tests/asyncio/test_asyncio_pubsub_manager.py @@ -445,3 +445,21 @@ class TestAsyncPubSubManager(unittest.TestCase): self.pm._handle_close_room.mock.assert_called_once_with( {'method': 'close_room', 'value': 'baz'} ) + + def test_background_thread_exception(self): + self.pm._handle_emit = AsyncMock(side_effect=[ValueError(), + asyncio.CancelledError]) + + async def messages(): + yield {'method': 'emit', 'value': 'foo'} + yield {'method': 'emit', 'value': 'bar'} + + self.pm._listen = messages + _run(self.pm._thread()) + + self.pm._handle_emit.mock.assert_any_call( + {'method': 'emit', 'value': 'foo'} + ) + self.pm._handle_emit.mock.assert_called_with( + {'method': 'emit', 'value': 'bar'} + ) diff --git a/tests/common/test_pubsub_manager.py b/tests/common/test_pubsub_manager.py index 066349f..ebaec41 100644 --- a/tests/common/test_pubsub_manager.py +++ b/tests/common/test_pubsub_manager.py @@ -394,3 +394,23 @@ class TestPubSubManager(unittest.TestCase): self.pm._handle_close_room.assert_called_once_with( {'method': 'close_room', 'value': 'baz'} ) + + def test_background_thread_exception(self): + self.pm._handle_emit = mock.MagicMock(side_effect=[ValueError(), None]) + + def messages(): + yield {'method': 'emit', 'value': 'foo'} + yield {'method': 'emit', 'value': 'bar'} + + self.pm._listen = mock.MagicMock(side_effect=messages) + try: + self.pm._thread() + except StopIteration: + pass + + self.pm._handle_emit.assert_any_call( + {'method': 'emit', 'value': 'foo'} + ) + self.pm._handle_emit.assert_called_with( + {'method': 'emit', 'value': 'bar'} + )