You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

138 lines
4.9 KiB

from datetime import datetime, timedelta, timezone
import pytest
import msgpack
from socketio import msgpack_packet
from socketio import packet
class TestMsgPackPacket:
def test_encode_decode(self):
p = msgpack_packet.MsgPackPacket(
packet.CONNECT, data={'auth': {'token': '123'}}, namespace='/foo')
p2 = msgpack_packet.MsgPackPacket(encoded_packet=p.encode())
assert p.packet_type == p2.packet_type
assert p.data == p2.data
assert p.id == p2.id
assert p.namespace == p2.namespace
def test_encode_decode_with_id(self):
p = msgpack_packet.MsgPackPacket(
packet.EVENT, data=['ev', 42], id=123, namespace='/foo')
p2 = msgpack_packet.MsgPackPacket(encoded_packet=p.encode())
assert p.packet_type == p2.packet_type
assert p.data == p2.data
assert p.id == p2.id
assert p.namespace == p2.namespace
def test_encode_binary_event_packet(self):
p = msgpack_packet.MsgPackPacket(packet.EVENT, data={'foo': b'bar'})
assert p.packet_type == packet.EVENT
p2 = msgpack_packet.MsgPackPacket(encoded_packet=p.encode())
assert p2.data == {'foo': b'bar'}
def test_encode_binary_ack_packet(self):
p = msgpack_packet.MsgPackPacket(packet.ACK, data={'foo': b'bar'})
assert p.packet_type == packet.ACK
p2 = msgpack_packet.MsgPackPacket(encoded_packet=p.encode())
assert p2.data == {'foo': b'bar'}
def test_encode_with_dumps_default(self):
def default(obj):
if isinstance(obj, datetime):
return obj.isoformat()
raise TypeError('Unknown type')
data = {
'current': datetime.now(tz=timezone(timedelta(0))),
'key': 'value',
}
p = msgpack_packet.MsgPackPacket.configure(dumps_default=default)(
data=data)
p2 = msgpack_packet.MsgPackPacket(encoded_packet=p.encode())
assert p.packet_type == p2.packet_type
assert p.id == p2.id
assert p.namespace == p2.namespace
assert p.data != p2.data
assert isinstance(p2.data, dict)
assert 'current' in p2.data
assert isinstance(p2.data['current'], str)
assert default(data['current']) == p2.data['current']
data.pop('current')
p2_data_without_current = p2.data.copy()
p2_data_without_current.pop('current')
assert data == p2_data_without_current
def test_encode_without_dumps_default(self):
data = {
'current': datetime.now(tz=timezone(timedelta(0))),
'key': 'value',
}
p_without_default = msgpack_packet.MsgPackPacket(data=data)
with pytest.raises(TypeError):
p_without_default.encode()
def test_encode_decode_with_ext_hook(self):
class Custom:
def __init__(self, value):
self.value = value
def __eq__(self, value: object) -> bool:
return isinstance(value, Custom) and self.value == value.value
def default(obj):
if isinstance(obj, Custom):
return msgpack.ExtType(1, obj.value)
raise TypeError('Unknown type')
def ext_hook(code, data):
if code == 1:
return Custom(data)
raise TypeError('Unknown ext type')
data = {'custom': Custom(b'custom_data'), 'key': 'value'}
p = msgpack_packet.MsgPackPacket.configure(dumps_default=default)(
data=data)
p2 = msgpack_packet.MsgPackPacket.configure(ext_hook=ext_hook)(
encoded_packet=p.encode()
)
assert p.packet_type == p2.packet_type
assert p.id == p2.id
assert p.data == p2.data
assert p.namespace == p2.namespace
def test_encode_decode_without_ext_hook(self):
class Custom:
def __init__(self, value):
self.value = value
def __eq__(self, value: object) -> bool:
return isinstance(value, Custom) and self.value == value.value
def default(obj):
if isinstance(obj, Custom):
return msgpack.ExtType(1, obj.value)
raise TypeError('Unknown type')
data = {'custom': Custom(b'custom_data'), 'key': 'value'}
p = msgpack_packet.MsgPackPacket.configure(dumps_default=default)(
data=data)
p2 = msgpack_packet.MsgPackPacket(encoded_packet=p.encode())
assert p.packet_type == p2.packet_type
assert p.id == p2.id
assert p.namespace == p2.namespace
assert p.data != p2.data
assert isinstance(p2.data, dict)
assert 'custom' in p2.data
assert isinstance(p2.data['custom'], msgpack.ExtType)
assert p2.data['custom'].code == 1
assert p2.data['custom'].data == b'custom_data'
data.pop('custom')
p2_data_without_custom = p2.data.copy()
p2_data_without_custom.pop('custom')
assert data == p2_data_without_custom