You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

562 lines
20 KiB

import functools
import logging
from unittest import mock
import pytest
from socketio import manager, packet, pubsub_manager
class TestPubSubManager:
def setup_method(self):
id = 0
def generate_id():
nonlocal id
id += 1
return str(id)
mock_server = mock.MagicMock()
mock_server.eio.generate_id = generate_id
mock_server.packet_class = packet.Packet
self.pm = pubsub_manager.PubSubManager()
self.pm._publish = mock.MagicMock()
self.pm.set_server(mock_server)
self.pm.host_id = "123456"
self.pm.initialize()
def test_default_init(self):
assert self.pm.channel == "socketio"
self.pm.server.start_background_task.assert_called_once_with(self.pm._thread)
def test_custom_init(self):
pubsub = pubsub_manager.PubSubManager(channel="foo")
assert pubsub.channel == "foo"
assert len(pubsub.host_id) == 32
def test_write_only_init(self):
mock_server = mock.MagicMock()
pm = pubsub_manager.PubSubManager(write_only=True)
pm.set_server(mock_server)
pm.initialize()
assert pm.channel == "socketio"
assert len(pm.host_id) == 32
assert pm.server.start_background_task.call_count == 0
def test_write_only_default_logger(self):
pm = pubsub_manager.PubSubManager(write_only=True)
pm.initialize()
assert pm.channel == "socketio"
assert len(pm.host_id) == 32
assert pm._get_logger() == logging.getLogger("socketio")
def test_write_only_with_provided_logger(self):
test_logger = logging.getLogger("new_logger")
pm = pubsub_manager.PubSubManager(write_only=True, logger=test_logger)
pm.initialize()
assert pm.channel == "socketio"
assert len(pm.host_id) == 32
assert pm._get_logger() == test_logger
def test_emit(self):
self.pm.emit("foo", "bar")
self.pm._publish.assert_called_once_with(
{
"method": "emit",
"event": "foo",
"data": "bar",
"namespace": "/",
"room": None,
"skip_sid": None,
"callback": None,
"host_id": "123456",
}
)
def test_emit_with_to(self):
sid = "ferris"
self.pm.emit("foo", "bar", to=sid)
self.pm._publish.assert_called_once_with(
{
"method": "emit",
"event": "foo",
"data": "bar",
"namespace": "/",
"room": sid,
"skip_sid": None,
"callback": None,
"host_id": "123456",
}
)
def test_emit_with_namespace(self):
self.pm.emit("foo", "bar", namespace="/baz")
self.pm._publish.assert_called_once_with(
{
"method": "emit",
"event": "foo",
"data": "bar",
"namespace": "/baz",
"room": None,
"skip_sid": None,
"callback": None,
"host_id": "123456",
}
)
def test_emit_with_room(self):
self.pm.emit("foo", "bar", room="baz")
self.pm._publish.assert_called_once_with(
{
"method": "emit",
"event": "foo",
"data": "bar",
"namespace": "/",
"room": "baz",
"skip_sid": None,
"callback": None,
"host_id": "123456",
}
)
def test_emit_with_skip_sid(self):
self.pm.emit("foo", "bar", skip_sid="baz")
self.pm._publish.assert_called_once_with(
{
"method": "emit",
"event": "foo",
"data": "bar",
"namespace": "/",
"room": None,
"skip_sid": "baz",
"callback": None,
"host_id": "123456",
}
)
def test_emit_with_callback(self):
with mock.patch.object(self.pm, "_generate_ack_id", return_value="123"):
self.pm.emit("foo", "bar", room="baz", callback="cb")
self.pm._publish.assert_called_once_with(
{
"method": "emit",
"event": "foo",
"data": "bar",
"namespace": "/",
"room": "baz",
"skip_sid": None,
"callback": ("baz", "/", "123"),
"host_id": "123456",
}
)
def test_emit_with_callback_without_server(self):
standalone_pm = pubsub_manager.PubSubManager()
with pytest.raises(RuntimeError):
standalone_pm.emit("foo", "bar", callback="cb")
def test_emit_with_callback_missing_room(self):
with mock.patch.object(self.pm, "_generate_ack_id", return_value="123"):
with pytest.raises(ValueError):
self.pm.emit("foo", "bar", callback="cb")
def test_emit_with_ignore_queue(self):
sid = self.pm.connect("123", "/")
self.pm.emit("foo", "bar", room=sid, namespace="/", ignore_queue=True)
self.pm._publish.assert_not_called()
assert self.pm.server._send_eio_packet.call_count == 1
assert self.pm.server._send_eio_packet.call_args_list[0][0][0] == "123"
pkt = self.pm.server._send_eio_packet.call_args_list[0][0][1]
assert pkt.encode() == '42["foo","bar"]'
def test_can_disconnect(self):
sid = self.pm.connect("123", "/")
assert self.pm.can_disconnect(sid, "/")
self.pm.can_disconnect(sid, "/foo")
self.pm._publish.assert_called_once_with(
{
"method": "disconnect",
"sid": sid,
"namespace": "/foo",
"host_id": "123456",
}
)
def test_disconnect(self):
self.pm.disconnect("foo")
self.pm._publish.assert_called_once_with(
{
"method": "disconnect",
"sid": "foo",
"namespace": "/",
"host_id": "123456",
}
)
def test_disconnect_ignore_queue(self):
sid = self.pm.connect("123", "/")
self.pm.pre_disconnect(sid, "/")
self.pm.disconnect(sid, ignore_queue=True)
self.pm._publish.assert_not_called()
assert not self.pm.is_connected(sid, "/")
def test_enter_room(self):
sid = self.pm.connect("123", "/")
self.pm.enter_room(sid, "/", "foo")
self.pm.enter_room("456", "/", "foo")
assert sid in self.pm.rooms["/"]["foo"]
assert self.pm.rooms["/"]["foo"][sid] == "123"
self.pm._publish.assert_called_once_with(
{
"method": "enter_room",
"sid": "456",
"room": "foo",
"namespace": "/",
"host_id": "123456",
}
)
def test_leave_room(self):
sid = self.pm.connect("123", "/")
self.pm.leave_room(sid, "/", "foo")
self.pm.leave_room("456", "/", "foo")
assert "foo" not in self.pm.rooms["/"]
self.pm._publish.assert_called_once_with(
{
"method": "leave_room",
"sid": "456",
"room": "foo",
"namespace": "/",
"host_id": "123456",
}
)
def test_close_room(self):
self.pm.close_room("foo")
self.pm._publish.assert_called_once_with(
{
"method": "close_room",
"room": "foo",
"namespace": "/",
"host_id": "123456",
}
)
def test_close_room_with_namespace(self):
self.pm.close_room("foo", "/bar")
self.pm._publish.assert_called_once_with(
{
"method": "close_room",
"room": "foo",
"namespace": "/bar",
"host_id": "123456",
}
)
def test_handle_emit(self):
with mock.patch.object(manager.Manager, "emit") as super_emit:
self.pm._handle_emit({"event": "foo", "data": "bar"})
super_emit.assert_called_once_with(
"foo",
"bar",
namespace=None,
room=None,
skip_sid=None,
callback=None,
)
def test_handle_emit_with_namespace(self):
with mock.patch.object(manager.Manager, "emit") as super_emit:
self.pm._handle_emit({"event": "foo", "data": "bar", "namespace": "/baz"})
super_emit.assert_called_once_with(
"foo",
"bar",
namespace="/baz",
room=None,
skip_sid=None,
callback=None,
)
def test_handle_emit_with_room(self):
with mock.patch.object(manager.Manager, "emit") as super_emit:
self.pm._handle_emit({"event": "foo", "data": "bar", "room": "baz"})
super_emit.assert_called_once_with(
"foo",
"bar",
namespace=None,
room="baz",
skip_sid=None,
callback=None,
)
def test_handle_emit_with_skip_sid(self):
with mock.patch.object(manager.Manager, "emit") as super_emit:
self.pm._handle_emit({"event": "foo", "data": "bar", "skip_sid": "123"})
super_emit.assert_called_once_with(
"foo",
"bar",
namespace=None,
room=None,
skip_sid="123",
callback=None,
)
def test_handle_emit_with_remote_callback(self):
with mock.patch.object(manager.Manager, "emit") as super_emit:
self.pm._handle_emit(
{
"event": "foo",
"data": "bar",
"namespace": "/baz",
"callback": ("sid", "/baz", 123),
"host_id": "x",
}
)
assert super_emit.call_count == 1
assert super_emit.call_args[0] == ("foo", "bar")
assert super_emit.call_args[1]["namespace"] == "/baz"
assert super_emit.call_args[1]["room"] is None
assert super_emit.call_args[1]["skip_sid"] is None
assert isinstance(super_emit.call_args[1]["callback"], functools.partial)
super_emit.call_args[1]["callback"]("one", 2, "three")
self.pm._publish.assert_called_once_with(
{
"method": "callback",
"host_id": "x",
"sid": "sid",
"namespace": "/baz",
"id": 123,
"args": ("one", 2, "three"),
}
)
def test_handle_emit_with_local_callback(self):
with mock.patch.object(manager.Manager, "emit") as super_emit:
self.pm._handle_emit(
{
"event": "foo",
"data": "bar",
"namespace": "/baz",
"callback": ("sid", "/baz", 123),
"host_id": self.pm.host_id,
}
)
assert super_emit.call_count == 1
assert super_emit.call_args[0] == ("foo", "bar")
assert super_emit.call_args[1]["namespace"] == "/baz"
assert super_emit.call_args[1]["room"] is None
assert super_emit.call_args[1]["skip_sid"] is None
assert isinstance(super_emit.call_args[1]["callback"], functools.partial)
super_emit.call_args[1]["callback"]("one", 2, "three")
self.pm._publish.assert_not_called()
def test_handle_callback(self):
host_id = self.pm.host_id
with mock.patch.object(self.pm, "trigger_callback") as trigger:
self.pm._handle_callback(
{
"method": "callback",
"host_id": host_id,
"sid": "sid",
"namespace": "/",
"id": 123,
"args": ("one", 2),
}
)
trigger.assert_called_once_with("sid", 123, ("one", 2))
def test_handle_callback_bad_host_id(self):
with mock.patch.object(self.pm, "trigger_callback") as trigger:
self.pm._handle_callback(
{
"method": "callback",
"host_id": "bad",
"sid": "sid",
"namespace": "/",
"id": 123,
"args": ("one", 2),
}
)
assert trigger.call_count == 0
def test_handle_callback_missing_args(self):
host_id = self.pm.host_id
with mock.patch.object(self.pm, "trigger_callback") as trigger:
self.pm._handle_callback(
{
"method": "callback",
"host_id": host_id,
"sid": "sid",
"namespace": "/",
"id": 123,
}
)
self.pm._handle_callback(
{
"method": "callback",
"host_id": host_id,
"sid": "sid",
"namespace": "/",
}
)
self.pm._handle_callback(
{"method": "callback", "host_id": host_id, "sid": "sid"}
)
self.pm._handle_callback({"method": "callback", "host_id": host_id})
assert trigger.call_count == 0
def test_handle_disconnect(self):
self.pm._handle_disconnect(
{"method": "disconnect", "sid": "123", "namespace": "/foo"}
)
self.pm.server.disconnect.assert_called_once_with(
sid="123", namespace="/foo", ignore_queue=True
)
def test_handle_enter_room(self):
sid = self.pm.connect("123", "/")
with mock.patch.object(manager.Manager, "enter_room") as super_enter_room:
self.pm._handle_enter_room(
{"method": "enter_room", "sid": sid, "namespace": "/", "room": "foo"}
)
self.pm._handle_enter_room(
{"method": "enter_room", "sid": "456", "namespace": "/", "room": "foo"}
)
super_enter_room.assert_called_once_with(sid, "/", "foo")
def test_handle_leave_room(self):
sid = self.pm.connect("123", "/")
with mock.patch.object(manager.Manager, "leave_room") as super_leave_room:
self.pm._handle_leave_room(
{"method": "leave_room", "sid": sid, "namespace": "/", "room": "foo"}
)
self.pm._handle_leave_room(
{"method": "leave_room", "sid": "456", "namespace": "/", "room": "foo"}
)
super_leave_room.assert_called_once_with(sid, "/", "foo")
def test_handle_close_room(self):
with mock.patch.object(manager.Manager, "close_room") as super_close_room:
self.pm._handle_close_room({"method": "close_room", "room": "foo"})
super_close_room.assert_called_once_with(room="foo", namespace=None)
def test_handle_close_room_with_namespace(self):
with mock.patch.object(manager.Manager, "close_room") as super_close_room:
self.pm._handle_close_room(
{"method": "close_room", "room": "foo", "namespace": "/bar"}
)
super_close_room.assert_called_once_with(room="foo", namespace="/bar")
def test_background_thread(self):
self.pm._handle_emit = mock.MagicMock()
self.pm._handle_callback = mock.MagicMock()
self.pm._handle_disconnect = mock.MagicMock()
self.pm._handle_enter_room = mock.MagicMock()
self.pm._handle_leave_room = mock.MagicMock()
self.pm._handle_close_room = mock.MagicMock()
host_id = self.pm.host_id
def messages():
import pickle
yield {"method": "emit", "value": "foo", "host_id": "x"}
yield {"missing": "method", "host_id": "x"}
yield '{"method": "callback", "value": "bar", "host_id": "x"}'
yield {
"method": "disconnect",
"sid": "123",
"namespace": "/foo",
"host_id": "x",
}
yield {"method": "bogus", "host_id": "x"}
yield pickle.dumps({"method": "close_room", "value": "baz", "host_id": "x"})
yield {
"method": "enter_room",
"sid": "123",
"namespace": "/foo",
"room": "room",
"host_id": "x",
}
yield {
"method": "leave_room",
"sid": "123",
"namespace": "/foo",
"room": "room",
"host_id": "x",
}
yield "bad json"
yield b"bad pickled"
# these should not publish anything on the queue, as they come from
# the same host
yield {"method": "emit", "value": "foo", "host_id": host_id}
yield {"method": "callback", "value": "bar", "host_id": host_id}
yield {
"method": "disconnect",
"sid": "123",
"namespace": "/foo",
"host_id": host_id,
}
yield pickle.dumps(
{"method": "close_room", "value": "baz", "host_id": host_id}
)
self.pm._listen = mock.MagicMock(side_effect=messages)
try:
self.pm._thread()
except StopIteration:
pass
self.pm._handle_emit.assert_called_once_with(
{"method": "emit", "value": "foo", "host_id": "x"}
)
self.pm._handle_callback.assert_any_call(
{"method": "callback", "value": "bar", "host_id": "x"}
)
self.pm._handle_callback.assert_any_call(
{"method": "callback", "value": "bar", "host_id": host_id}
)
self.pm._handle_disconnect.assert_called_once_with(
{"method": "disconnect", "sid": "123", "namespace": "/foo", "host_id": "x"}
)
self.pm._handle_enter_room.assert_called_once_with(
{
"method": "enter_room",
"sid": "123",
"namespace": "/foo",
"room": "room",
"host_id": "x",
}
)
self.pm._handle_leave_room.assert_called_once_with(
{
"method": "leave_room",
"sid": "123",
"namespace": "/foo",
"room": "room",
"host_id": "x",
}
)
self.pm._handle_close_room.assert_called_once_with(
{"method": "close_room", "value": "baz", "host_id": "x"}
)
def test_background_thread_exception(self):
self.pm._handle_emit = mock.MagicMock(side_effect=[ValueError(), None])
def messages():
yield {"method": "emit", "value": "foo", "host_id": "x"}
yield {"method": "emit", "value": "bar", "host_id": "x"}
self.pm._listen = mock.MagicMock(side_effect=messages)
try:
self.pm._thread()
except StopIteration:
pass
self.pm._handle_emit.assert_any_call(
{"method": "emit", "value": "foo", "host_id": "x"}
)
self.pm._handle_emit.assert_called_with(
{"method": "emit", "value": "bar", "host_id": "x"}
)