from unittest import mock import pytest from socketio import manager from socketio import packet class TestBaseManager: def setup_method(self): id = 0 def generate_id(): nonlocal id id += 1 return str(id) mock_server = mock.MagicMock() mock_server.eio.generate_id = generate_id mock_server.packet_class = packet.Packet self.bm = manager.Manager() self.bm.set_server(mock_server) self.bm.initialize() def test_connect(self): sid = self.bm.connect('123', '/foo') assert None in self.bm.rooms['/foo'] assert sid in self.bm.rooms['/foo'] assert sid in self.bm.rooms['/foo'][None] assert sid in self.bm.rooms['/foo'][sid] assert dict(self.bm.rooms['/foo'][None]) == {sid: '123'} assert dict(self.bm.rooms['/foo'][sid]) == {sid: '123'} assert self.bm.sid_from_eio_sid('123', '/foo') == sid assert self.bm.sid_from_eio_sid('1234', '/foo') is None assert self.bm.sid_from_eio_sid('123', '/bar') is None assert self.bm.eio_sid_from_sid(sid, '/foo') == '123' assert self.bm.eio_sid_from_sid('x', '/foo') is None assert self.bm.eio_sid_from_sid(sid, '/bar') is None def test_pre_disconnect(self): sid1 = self.bm.connect('123', '/foo') sid2 = self.bm.connect('456', '/foo') assert self.bm.is_connected(sid1, '/foo') assert self.bm.pre_disconnect(sid1, '/foo') == '123' assert self.bm.pending_disconnect == {'/foo': [sid1]} assert not self.bm.is_connected(sid1, '/foo') assert self.bm.pre_disconnect(sid2, '/foo') == '456' assert self.bm.pending_disconnect == {'/foo': [sid1, sid2]} assert not self.bm.is_connected(sid2, '/foo') self.bm.disconnect(sid1, '/foo') assert self.bm.pending_disconnect == {'/foo': [sid2]} self.bm.disconnect(sid2, '/foo') assert self.bm.pending_disconnect == {} def test_disconnect(self): sid1 = self.bm.connect('123', '/foo') sid2 = self.bm.connect('456', '/foo') self.bm.enter_room(sid1, '/foo', 'bar') self.bm.enter_room(sid2, '/foo', 'baz') self.bm.disconnect(sid1, '/foo') assert dict(self.bm.rooms['/foo'][None]) == {sid2: '456'} assert dict(self.bm.rooms['/foo'][sid2]) == {sid2: '456'} assert dict(self.bm.rooms['/foo']['baz']) == {sid2: '456'} def test_disconnect_default_namespace(self): sid1 = self.bm.connect('123', '/') sid2 = self.bm.connect('123', '/foo') sid3 = self.bm.connect('456', '/') sid4 = self.bm.connect('456', '/foo') assert self.bm.is_connected(sid1, '/') assert self.bm.is_connected(sid2, '/foo') assert not self.bm.is_connected(sid2, '/') assert not self.bm.is_connected(sid1, '/foo') self.bm.disconnect(sid1, '/') assert not self.bm.is_connected(sid1, '/') assert self.bm.is_connected(sid2, '/foo') self.bm.disconnect(sid2, '/foo') assert not self.bm.is_connected(sid2, '/foo') assert dict(self.bm.rooms['/'][None]) == {sid3: '456'} assert dict(self.bm.rooms['/'][sid3]) == {sid3: '456'} assert dict(self.bm.rooms['/foo'][None]) == {sid4: '456'} assert dict(self.bm.rooms['/foo'][sid4]) == {sid4: '456'} def test_disconnect_twice(self): sid1 = self.bm.connect('123', '/') sid2 = self.bm.connect('123', '/foo') sid3 = self.bm.connect('456', '/') sid4 = self.bm.connect('456', '/foo') self.bm.disconnect(sid1, '/') self.bm.disconnect(sid2, '/foo') self.bm.disconnect(sid1, '/') self.bm.disconnect(sid2, '/foo') assert dict(self.bm.rooms['/'][None]) == {sid3: '456'} assert dict(self.bm.rooms['/'][sid3]) == {sid3: '456'} assert dict(self.bm.rooms['/foo'][None]) == {sid4: '456'} assert dict(self.bm.rooms['/foo'][sid4]) == {sid4: '456'} def test_disconnect_all(self): sid1 = self.bm.connect('123', '/foo') sid2 = self.bm.connect('456', '/foo') self.bm.enter_room(sid1, '/foo', 'bar') self.bm.enter_room(sid2, '/foo', 'baz') self.bm.disconnect(sid1, '/foo') self.bm.disconnect(sid2, '/foo') assert self.bm.rooms == {} def test_disconnect_with_callbacks(self): sid1 = self.bm.connect('123', '/') sid2 = self.bm.connect('123', '/foo') sid3 = self.bm.connect('456', '/foo') self.bm._generate_ack_id(sid1, 'f') self.bm._generate_ack_id(sid2, 'g') self.bm._generate_ack_id(sid3, 'h') self.bm.disconnect(sid2, '/foo') assert sid2 not in self.bm.callbacks self.bm.disconnect(sid1, '/') assert sid1 not in self.bm.callbacks assert sid3 in self.bm.callbacks def test_disconnect_bad_namespace(self): self.bm.connect('123', '/') self.bm.connect('123', '/foo') self.bm.disconnect('123', '/bar') # should not assert def test_enter_room_bad_namespace(self): sid = self.bm.connect('123', '/') with pytest.raises(ValueError): self.bm.enter_room(sid, '/foo', 'bar') def test_trigger_callback(self): sid1 = self.bm.connect('123', '/') sid2 = self.bm.connect('123', '/foo') cb = mock.MagicMock() id1 = self.bm._generate_ack_id(sid1, cb) id2 = self.bm._generate_ack_id(sid2, cb) id3 = self.bm._generate_ack_id(sid1, cb) self.bm.trigger_callback(sid1, id1, ['foo']) self.bm.trigger_callback(sid1, id3, ['bar']) self.bm.trigger_callback(sid2, id2, ['bar', 'baz']) assert cb.call_count == 3 cb.assert_any_call('foo') cb.assert_any_call('bar') cb.assert_any_call('bar', 'baz') def test_invalid_callback(self): sid = self.bm.connect('123', '/') cb = mock.MagicMock() id = self.bm._generate_ack_id(sid, cb) # these should not raise an exception self.bm.trigger_callback('xxx', id, ['foo']) self.bm.trigger_callback(sid, id + 1, ['foo']) assert cb.call_count == 0 def test_get_namespaces(self): assert list(self.bm.get_namespaces()) == [] self.bm.connect('123', '/') self.bm.connect('123', '/foo') namespaces = list(self.bm.get_namespaces()) assert len(namespaces) == 2 assert '/' in namespaces assert '/foo' in namespaces def test_get_participants(self): sid1 = self.bm.connect('123', '/') sid2 = self.bm.connect('456', '/') sid3 = self.bm.connect('789', '/') self.bm.disconnect(sid3, '/') assert sid3 not in self.bm.rooms['/'][None] participants = list(self.bm.get_participants('/', None)) assert len(participants) == 2 assert (sid1, '123') in participants assert (sid2, '456') in participants assert (sid3, '789') not in participants def test_leave_invalid_room(self): sid = self.bm.connect('123', '/foo') self.bm.leave_room(sid, '/foo', 'baz') self.bm.leave_room(sid, '/bar', 'baz') def test_no_room(self): rooms = self.bm.get_rooms('123', '/foo') assert [] == rooms def test_close_room(self): sid1 = self.bm.connect('123', '/foo') self.bm.connect('456', '/foo') self.bm.connect('789', '/foo') self.bm.enter_room(sid1, '/foo', 'bar') self.bm.enter_room(sid1, '/foo', 'bar') self.bm.close_room('bar', '/foo') assert 'bar' not in self.bm.rooms['/foo'] def test_close_invalid_room(self): self.bm.close_room('bar', '/foo') def test_rooms(self): sid = self.bm.connect('123', '/foo') self.bm.enter_room(sid, '/foo', 'bar') r = self.bm.get_rooms(sid, '/foo') assert len(r) == 2 assert sid in r assert 'bar' in r def test_emit_to_sid(self): sid = self.bm.connect('123', '/foo') self.bm.connect('456', '/foo') self.bm.emit('my event', {'foo': 'bar'}, namespace='/foo', to=sid) assert self.bm.server._send_eio_packet.call_count == 1 assert self.bm.server._send_eio_packet.call_args_list[0][0][0] == '123' pkt = self.bm.server._send_eio_packet.call_args_list[0][0][1] assert pkt.encode() == '42/foo,["my event",{"foo":"bar"}]' def test_emit_to_room(self): sid1 = self.bm.connect('123', '/foo') self.bm.enter_room(sid1, '/foo', 'bar') sid2 = self.bm.connect('456', '/foo') self.bm.enter_room(sid2, '/foo', 'bar') self.bm.connect('789', '/foo') self.bm.emit('my event', {'foo': 'bar'}, namespace='/foo', room='bar') assert self.bm.server._send_eio_packet.call_count == 2 assert self.bm.server._send_eio_packet.call_args_list[0][0][0] == '123' assert self.bm.server._send_eio_packet.call_args_list[1][0][0] == '456' pkt = self.bm.server._send_eio_packet.call_args_list[0][0][1] assert pkt == self.bm.server._send_eio_packet.call_args_list[1][0][1] assert pkt.encode() == '42/foo,["my event",{"foo":"bar"}]' def test_emit_to_rooms(self): sid1 = self.bm.connect('123', '/foo') self.bm.enter_room(sid1, '/foo', 'bar') sid2 = self.bm.connect('456', '/foo') self.bm.enter_room(sid2, '/foo', 'bar') self.bm.enter_room(sid2, '/foo', 'baz') sid3 = self.bm.connect('789', '/foo') self.bm.enter_room(sid3, '/foo', 'baz') self.bm.emit('my event', {'foo': 'bar'}, namespace='/foo', room=['bar', 'baz']) assert self.bm.server._send_eio_packet.call_count == 3 assert self.bm.server._send_eio_packet.call_args_list[0][0][0] == '123' assert self.bm.server._send_eio_packet.call_args_list[1][0][0] == '456' assert self.bm.server._send_eio_packet.call_args_list[2][0][0] == '789' pkt = self.bm.server._send_eio_packet.call_args_list[0][0][1] assert pkt == self.bm.server._send_eio_packet.call_args_list[1][0][1] assert pkt == self.bm.server._send_eio_packet.call_args_list[2][0][1] assert pkt.encode() == '42/foo,["my event",{"foo":"bar"}]' def test_emit_to_all(self): sid1 = self.bm.connect('123', '/foo') self.bm.enter_room(sid1, '/foo', 'bar') sid2 = self.bm.connect('456', '/foo') self.bm.enter_room(sid2, '/foo', 'bar') self.bm.connect('789', '/foo') self.bm.connect('abc', '/bar') self.bm.emit('my event', {'foo': 'bar'}, namespace='/foo') assert self.bm.server._send_eio_packet.call_count == 3 assert self.bm.server._send_eio_packet.call_args_list[0][0][0] == '123' assert self.bm.server._send_eio_packet.call_args_list[1][0][0] == '456' assert self.bm.server._send_eio_packet.call_args_list[2][0][0] == '789' pkt = self.bm.server._send_eio_packet.call_args_list[0][0][1] assert pkt == self.bm.server._send_eio_packet.call_args_list[1][0][1] assert pkt == self.bm.server._send_eio_packet.call_args_list[2][0][1] assert pkt.encode() == '42/foo,["my event",{"foo":"bar"}]' def test_emit_to_all_skip_one(self): sid1 = self.bm.connect('123', '/foo') self.bm.enter_room(sid1, '/foo', 'bar') sid2 = self.bm.connect('456', '/foo') self.bm.enter_room(sid2, '/foo', 'bar') self.bm.connect('789', '/foo') self.bm.connect('abc', '/bar') self.bm.emit( 'my event', {'foo': 'bar'}, namespace='/foo', skip_sid=sid2 ) assert self.bm.server._send_eio_packet.call_count == 2 assert self.bm.server._send_eio_packet.call_args_list[0][0][0] == '123' assert self.bm.server._send_eio_packet.call_args_list[1][0][0] == '789' pkt = self.bm.server._send_eio_packet.call_args_list[0][0][1] assert pkt == self.bm.server._send_eio_packet.call_args_list[1][0][1] assert pkt.encode() == '42/foo,["my event",{"foo":"bar"}]' def test_emit_to_all_skip_two(self): sid1 = self.bm.connect('123', '/foo') self.bm.enter_room(sid1, '/foo', 'bar') sid2 = self.bm.connect('456', '/foo') self.bm.enter_room(sid2, '/foo', 'bar') sid3 = self.bm.connect('789', '/foo') self.bm.connect('abc', '/bar') self.bm.emit( 'my event', {'foo': 'bar'}, namespace='/foo', skip_sid=[sid1, sid3], ) assert self.bm.server._send_eio_packet.call_count == 1 assert self.bm.server._send_eio_packet.call_args_list[0][0][0] == '456' pkt = self.bm.server._send_eio_packet.call_args_list[0][0][1] assert pkt.encode() == '42/foo,["my event",{"foo":"bar"}]' def test_emit_with_callback(self): sid = self.bm.connect('123', '/foo') self.bm._generate_ack_id = mock.MagicMock() self.bm._generate_ack_id.return_value = 11 self.bm.emit( 'my event', {'foo': 'bar'}, namespace='/foo', callback='cb' ) self.bm._generate_ack_id.assert_called_once_with(sid, 'cb') assert self.bm.server._send_packet.call_count == 1 assert self.bm.server._send_packet.call_args_list[0][0][0] == '123' pkt = self.bm.server._send_packet.call_args_list[0][0][1] assert pkt.encode() == '2/foo,11["my event",{"foo":"bar"}]' def test_emit_to_invalid_room(self): self.bm.emit('my event', {'foo': 'bar'}, namespace='/', room='123') def test_emit_to_invalid_namespace(self): self.bm.emit('my event', {'foo': 'bar'}, namespace='/foo') def test_emit_with_tuple(self): sid = self.bm.connect('123', '/foo') self.bm.emit('my event', ('foo', 'bar'), namespace='/foo', room=sid) assert self.bm.server._send_eio_packet.call_count == 1 assert self.bm.server._send_eio_packet.call_args_list[0][0][0] == '123' pkt = self.bm.server._send_eio_packet.call_args_list[0][0][1] assert pkt.encode() == '42/foo,["my event","foo","bar"]' def test_emit_with_list(self): sid = self.bm.connect('123', '/foo') self.bm.emit('my event', ['foo', 'bar'], namespace='/foo', room=sid) assert self.bm.server._send_eio_packet.call_count == 1 assert self.bm.server._send_eio_packet.call_args_list[0][0][0] == '123' pkt = self.bm.server._send_eio_packet.call_args_list[0][0][1] assert pkt.encode() == '42/foo,["my event",["foo","bar"]]' def test_emit_with_none(self): sid = self.bm.connect('123', '/foo') self.bm.emit('my event', None, namespace='/foo', room=sid) assert self.bm.server._send_eio_packet.call_count == 1 assert self.bm.server._send_eio_packet.call_args_list[0][0][0] == '123' pkt = self.bm.server._send_eio_packet.call_args_list[0][0][1] assert pkt.encode() == '42/foo,["my event"]' def test_emit_binary(self): sid = self.bm.connect('123', '/') self.bm.emit('my event', b'my binary data', namespace='/', room=sid) assert self.bm.server._send_eio_packet.call_count == 2 assert self.bm.server._send_eio_packet.call_args_list[0][0][0] == '123' pkt = self.bm.server._send_eio_packet.call_args_list[0][0][1] assert pkt.encode() == '451-["my event",{"_placeholder":true,"num":0}]' assert self.bm.server._send_eio_packet.call_args_list[1][0][0] == '123' pkt = self.bm.server._send_eio_packet.call_args_list[1][0][1] assert pkt.encode() == b'my binary data'