import functools import logging from unittest import mock import pytest from socketio import manager, packet, pubsub_manager class TestPubSubManager: def setup_method(self): id = 0 def generate_id(): nonlocal id id += 1 return str(id) mock_server = mock.MagicMock() mock_server.eio.generate_id = generate_id mock_server.packet_class = packet.Packet self.pm = pubsub_manager.PubSubManager() self.pm._publish = mock.MagicMock() self.pm.set_server(mock_server) self.pm.host_id = "123456" self.pm.initialize() def test_default_init(self): assert self.pm.channel == "socketio" self.pm.server.start_background_task.assert_called_once_with(self.pm._thread) def test_custom_init(self): pubsub = pubsub_manager.PubSubManager(channel="foo") assert pubsub.channel == "foo" assert len(pubsub.host_id) == 32 def test_write_only_init(self): mock_server = mock.MagicMock() pm = pubsub_manager.PubSubManager(write_only=True) pm.set_server(mock_server) pm.initialize() assert pm.channel == "socketio" assert len(pm.host_id) == 32 assert pm.server.start_background_task.call_count == 0 def test_write_only_default_logger(self): pm = pubsub_manager.PubSubManager(write_only=True) pm.initialize() assert pm.channel == "socketio" assert len(pm.host_id) == 32 assert pm._get_logger() == logging.getLogger("socketio") def test_write_only_with_provided_logger(self): test_logger = logging.getLogger("new_logger") pm = pubsub_manager.PubSubManager(write_only=True, logger=test_logger) pm.initialize() assert pm.channel == "socketio" assert len(pm.host_id) == 32 assert pm._get_logger() == test_logger def test_emit(self): self.pm.emit("foo", "bar") self.pm._publish.assert_called_once_with( { "method": "emit", "event": "foo", "data": "bar", "namespace": "/", "room": None, "skip_sid": None, "callback": None, "host_id": "123456", } ) def test_emit_with_to(self): sid = "ferris" self.pm.emit("foo", "bar", to=sid) self.pm._publish.assert_called_once_with( { "method": "emit", "event": "foo", "data": "bar", "namespace": "/", "room": sid, "skip_sid": None, "callback": None, "host_id": "123456", } ) def test_emit_with_namespace(self): self.pm.emit("foo", "bar", namespace="/baz") self.pm._publish.assert_called_once_with( { "method": "emit", "event": "foo", "data": "bar", "namespace": "/baz", "room": None, "skip_sid": None, "callback": None, "host_id": "123456", } ) def test_emit_with_room(self): self.pm.emit("foo", "bar", room="baz") self.pm._publish.assert_called_once_with( { "method": "emit", "event": "foo", "data": "bar", "namespace": "/", "room": "baz", "skip_sid": None, "callback": None, "host_id": "123456", } ) def test_emit_with_skip_sid(self): self.pm.emit("foo", "bar", skip_sid="baz") self.pm._publish.assert_called_once_with( { "method": "emit", "event": "foo", "data": "bar", "namespace": "/", "room": None, "skip_sid": "baz", "callback": None, "host_id": "123456", } ) def test_emit_with_callback(self): with mock.patch.object(self.pm, "_generate_ack_id", return_value="123"): self.pm.emit("foo", "bar", room="baz", callback="cb") self.pm._publish.assert_called_once_with( { "method": "emit", "event": "foo", "data": "bar", "namespace": "/", "room": "baz", "skip_sid": None, "callback": ("baz", "/", "123"), "host_id": "123456", } ) def test_emit_with_callback_without_server(self): standalone_pm = pubsub_manager.PubSubManager() with pytest.raises(RuntimeError): standalone_pm.emit("foo", "bar", callback="cb") def test_emit_with_callback_missing_room(self): with mock.patch.object(self.pm, "_generate_ack_id", return_value="123"): with pytest.raises(ValueError): self.pm.emit("foo", "bar", callback="cb") def test_emit_with_ignore_queue(self): sid = self.pm.connect("123", "/") self.pm.emit("foo", "bar", room=sid, namespace="/", ignore_queue=True) self.pm._publish.assert_not_called() assert self.pm.server._send_eio_packet.call_count == 1 assert self.pm.server._send_eio_packet.call_args_list[0][0][0] == "123" pkt = self.pm.server._send_eio_packet.call_args_list[0][0][1] assert pkt.encode() == '42["foo","bar"]' def test_can_disconnect(self): sid = self.pm.connect("123", "/") assert self.pm.can_disconnect(sid, "/") self.pm.can_disconnect(sid, "/foo") self.pm._publish.assert_called_once_with( { "method": "disconnect", "sid": sid, "namespace": "/foo", "host_id": "123456", } ) def test_disconnect(self): self.pm.disconnect("foo") self.pm._publish.assert_called_once_with( { "method": "disconnect", "sid": "foo", "namespace": "/", "host_id": "123456", } ) def test_disconnect_ignore_queue(self): sid = self.pm.connect("123", "/") self.pm.pre_disconnect(sid, "/") self.pm.disconnect(sid, ignore_queue=True) self.pm._publish.assert_not_called() assert not self.pm.is_connected(sid, "/") def test_enter_room(self): sid = self.pm.connect("123", "/") self.pm.enter_room(sid, "/", "foo") self.pm.enter_room("456", "/", "foo") assert sid in self.pm.rooms["/"]["foo"] assert self.pm.rooms["/"]["foo"][sid] == "123" self.pm._publish.assert_called_once_with( { "method": "enter_room", "sid": "456", "room": "foo", "namespace": "/", "host_id": "123456", } ) def test_leave_room(self): sid = self.pm.connect("123", "/") self.pm.leave_room(sid, "/", "foo") self.pm.leave_room("456", "/", "foo") assert "foo" not in self.pm.rooms["/"] self.pm._publish.assert_called_once_with( { "method": "leave_room", "sid": "456", "room": "foo", "namespace": "/", "host_id": "123456", } ) def test_close_room(self): self.pm.close_room("foo") self.pm._publish.assert_called_once_with( { "method": "close_room", "room": "foo", "namespace": "/", "host_id": "123456", } ) def test_close_room_with_namespace(self): self.pm.close_room("foo", "/bar") self.pm._publish.assert_called_once_with( { "method": "close_room", "room": "foo", "namespace": "/bar", "host_id": "123456", } ) def test_handle_emit(self): with mock.patch.object(manager.Manager, "emit") as super_emit: self.pm._handle_emit({"event": "foo", "data": "bar"}) super_emit.assert_called_once_with( "foo", "bar", namespace=None, room=None, skip_sid=None, callback=None, ) def test_handle_emit_with_namespace(self): with mock.patch.object(manager.Manager, "emit") as super_emit: self.pm._handle_emit({"event": "foo", "data": "bar", "namespace": "/baz"}) super_emit.assert_called_once_with( "foo", "bar", namespace="/baz", room=None, skip_sid=None, callback=None, ) def test_handle_emit_with_room(self): with mock.patch.object(manager.Manager, "emit") as super_emit: self.pm._handle_emit({"event": "foo", "data": "bar", "room": "baz"}) super_emit.assert_called_once_with( "foo", "bar", namespace=None, room="baz", skip_sid=None, callback=None, ) def test_handle_emit_with_skip_sid(self): with mock.patch.object(manager.Manager, "emit") as super_emit: self.pm._handle_emit({"event": "foo", "data": "bar", "skip_sid": "123"}) super_emit.assert_called_once_with( "foo", "bar", namespace=None, room=None, skip_sid="123", callback=None, ) def test_handle_emit_with_remote_callback(self): with mock.patch.object(manager.Manager, "emit") as super_emit: self.pm._handle_emit( { "event": "foo", "data": "bar", "namespace": "/baz", "callback": ("sid", "/baz", 123), "host_id": "x", } ) assert super_emit.call_count == 1 assert super_emit.call_args[0] == ("foo", "bar") assert super_emit.call_args[1]["namespace"] == "/baz" assert super_emit.call_args[1]["room"] is None assert super_emit.call_args[1]["skip_sid"] is None assert isinstance(super_emit.call_args[1]["callback"], functools.partial) super_emit.call_args[1]["callback"]("one", 2, "three") self.pm._publish.assert_called_once_with( { "method": "callback", "host_id": "x", "sid": "sid", "namespace": "/baz", "id": 123, "args": ("one", 2, "three"), } ) def test_handle_emit_with_local_callback(self): with mock.patch.object(manager.Manager, "emit") as super_emit: self.pm._handle_emit( { "event": "foo", "data": "bar", "namespace": "/baz", "callback": ("sid", "/baz", 123), "host_id": self.pm.host_id, } ) assert super_emit.call_count == 1 assert super_emit.call_args[0] == ("foo", "bar") assert super_emit.call_args[1]["namespace"] == "/baz" assert super_emit.call_args[1]["room"] is None assert super_emit.call_args[1]["skip_sid"] is None assert isinstance(super_emit.call_args[1]["callback"], functools.partial) super_emit.call_args[1]["callback"]("one", 2, "three") self.pm._publish.assert_not_called() def test_handle_callback(self): host_id = self.pm.host_id with mock.patch.object(self.pm, "trigger_callback") as trigger: self.pm._handle_callback( { "method": "callback", "host_id": host_id, "sid": "sid", "namespace": "/", "id": 123, "args": ("one", 2), } ) trigger.assert_called_once_with("sid", 123, ("one", 2)) def test_handle_callback_bad_host_id(self): with mock.patch.object(self.pm, "trigger_callback") as trigger: self.pm._handle_callback( { "method": "callback", "host_id": "bad", "sid": "sid", "namespace": "/", "id": 123, "args": ("one", 2), } ) assert trigger.call_count == 0 def test_handle_callback_missing_args(self): host_id = self.pm.host_id with mock.patch.object(self.pm, "trigger_callback") as trigger: self.pm._handle_callback( { "method": "callback", "host_id": host_id, "sid": "sid", "namespace": "/", "id": 123, } ) self.pm._handle_callback( { "method": "callback", "host_id": host_id, "sid": "sid", "namespace": "/", } ) self.pm._handle_callback( {"method": "callback", "host_id": host_id, "sid": "sid"} ) self.pm._handle_callback({"method": "callback", "host_id": host_id}) assert trigger.call_count == 0 def test_handle_disconnect(self): self.pm._handle_disconnect( {"method": "disconnect", "sid": "123", "namespace": "/foo"} ) self.pm.server.disconnect.assert_called_once_with( sid="123", namespace="/foo", ignore_queue=True ) def test_handle_enter_room(self): sid = self.pm.connect("123", "/") with mock.patch.object(manager.Manager, "enter_room") as super_enter_room: self.pm._handle_enter_room( {"method": "enter_room", "sid": sid, "namespace": "/", "room": "foo"} ) self.pm._handle_enter_room( {"method": "enter_room", "sid": "456", "namespace": "/", "room": "foo"} ) super_enter_room.assert_called_once_with(sid, "/", "foo") def test_handle_leave_room(self): sid = self.pm.connect("123", "/") with mock.patch.object(manager.Manager, "leave_room") as super_leave_room: self.pm._handle_leave_room( {"method": "leave_room", "sid": sid, "namespace": "/", "room": "foo"} ) self.pm._handle_leave_room( {"method": "leave_room", "sid": "456", "namespace": "/", "room": "foo"} ) super_leave_room.assert_called_once_with(sid, "/", "foo") def test_handle_close_room(self): with mock.patch.object(manager.Manager, "close_room") as super_close_room: self.pm._handle_close_room({"method": "close_room", "room": "foo"}) super_close_room.assert_called_once_with(room="foo", namespace=None) def test_handle_close_room_with_namespace(self): with mock.patch.object(manager.Manager, "close_room") as super_close_room: self.pm._handle_close_room( {"method": "close_room", "room": "foo", "namespace": "/bar"} ) super_close_room.assert_called_once_with(room="foo", namespace="/bar") def test_background_thread(self): self.pm._handle_emit = mock.MagicMock() self.pm._handle_callback = mock.MagicMock() self.pm._handle_disconnect = mock.MagicMock() self.pm._handle_enter_room = mock.MagicMock() self.pm._handle_leave_room = mock.MagicMock() self.pm._handle_close_room = mock.MagicMock() host_id = self.pm.host_id def messages(): import pickle yield {"method": "emit", "value": "foo", "host_id": "x"} yield {"missing": "method", "host_id": "x"} yield '{"method": "callback", "value": "bar", "host_id": "x"}' yield { "method": "disconnect", "sid": "123", "namespace": "/foo", "host_id": "x", } yield {"method": "bogus", "host_id": "x"} yield pickle.dumps({"method": "close_room", "value": "baz", "host_id": "x"}) yield { "method": "enter_room", "sid": "123", "namespace": "/foo", "room": "room", "host_id": "x", } yield { "method": "leave_room", "sid": "123", "namespace": "/foo", "room": "room", "host_id": "x", } yield "bad json" yield b"bad pickled" # these should not publish anything on the queue, as they come from # the same host yield {"method": "emit", "value": "foo", "host_id": host_id} yield {"method": "callback", "value": "bar", "host_id": host_id} yield { "method": "disconnect", "sid": "123", "namespace": "/foo", "host_id": host_id, } yield pickle.dumps( {"method": "close_room", "value": "baz", "host_id": host_id} ) self.pm._listen = mock.MagicMock(side_effect=messages) try: self.pm._thread() except StopIteration: pass self.pm._handle_emit.assert_called_once_with( {"method": "emit", "value": "foo", "host_id": "x"} ) self.pm._handle_callback.assert_any_call( {"method": "callback", "value": "bar", "host_id": "x"} ) self.pm._handle_callback.assert_any_call( {"method": "callback", "value": "bar", "host_id": host_id} ) self.pm._handle_disconnect.assert_called_once_with( {"method": "disconnect", "sid": "123", "namespace": "/foo", "host_id": "x"} ) self.pm._handle_enter_room.assert_called_once_with( { "method": "enter_room", "sid": "123", "namespace": "/foo", "room": "room", "host_id": "x", } ) self.pm._handle_leave_room.assert_called_once_with( { "method": "leave_room", "sid": "123", "namespace": "/foo", "room": "room", "host_id": "x", } ) self.pm._handle_close_room.assert_called_once_with( {"method": "close_room", "value": "baz", "host_id": "x"} ) def test_background_thread_exception(self): self.pm._handle_emit = mock.MagicMock(side_effect=[ValueError(), None]) def messages(): yield {"method": "emit", "value": "foo", "host_id": "x"} yield {"method": "emit", "value": "bar", "host_id": "x"} self.pm._listen = mock.MagicMock(side_effect=messages) try: self.pm._thread() except StopIteration: pass self.pm._handle_emit.assert_any_call( {"method": "emit", "value": "foo", "host_id": "x"} ) self.pm._handle_emit.assert_called_with( {"method": "emit", "value": "bar", "host_id": "x"} )