Browse Source

Restore binary message support in message queue setups (Fixes #1508) (#1509)

pull/1510/head
Miguel Grinberg 2 days ago
committed by GitHub
parent
commit
bab4a10f48
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 14
      src/socketio/async_pubsub_manager.py
  2. 40
      src/socketio/packet.py
  3. 14
      src/socketio/pubsub_manager.py
  4. 67
      tests/async/test_pubsub_manager.py
  5. 24
      tests/common/test_packet.py
  6. 65
      tests/common/test_pubsub_manager.py

14
src/socketio/async_pubsub_manager.py

@ -1,10 +1,12 @@
import asyncio import asyncio
import base64
from functools import partial from functools import partial
import uuid import uuid
from engineio import json from engineio import json
from .async_manager import AsyncManager from .async_manager import AsyncManager
from .packet import Packet
class AsyncPubSubManager(AsyncManager): class AsyncPubSubManager(AsyncManager):
@ -64,8 +66,12 @@ class AsyncPubSubManager(AsyncManager):
callback = (room, namespace, id) callback = (room, namespace, id)
else: else:
callback = None callback = None
binary = Packet.data_is_binary(data)
if binary:
data, attachments = Packet.deconstruct_binary(data)
data = [data, *[base64.b64encode(a).decode() for a in attachments]]
message = {'method': 'emit', 'event': event, 'data': data, message = {'method': 'emit', 'event': event, 'data': data,
'namespace': namespace, 'room': room, 'binary': binary, 'namespace': namespace, 'room': room,
'skip_sid': skip_sid, 'callback': callback, 'skip_sid': skip_sid, 'callback': callback,
'host_id': self.host_id} 'host_id': self.host_id}
await self._handle_emit(message) # handle in this host await self._handle_emit(message) # handle in this host
@ -145,7 +151,11 @@ class AsyncPubSubManager(AsyncManager):
*remote_callback) *remote_callback)
else: else:
callback = None callback = None
await super().emit(message['event'], message['data'], data = message['data']
if message.get('binary'):
attachments = [base64.b64decode(a) for a in data[1:]]
data = Packet.reconstruct_binary(data[0], attachments)
await super().emit(message['event'], data,
namespace=message.get('namespace'), namespace=message.get('namespace'),
room=message.get('room'), room=message.get('room'),
skip_sid=message.get('skip_sid'), skip_sid=message.get('skip_sid'),

40
src/socketio/packet.py

@ -29,7 +29,7 @@ class Packet:
self.namespace = namespace self.namespace = namespace
self.id = id self.id = id
if self.uses_binary_events and \ if self.uses_binary_events and \
(binary or (binary is None and self._data_is_binary( (binary or (binary is None and self.data_is_binary(
self.data))): self.data))):
if self.packet_type == EVENT: if self.packet_type == EVENT:
self.packet_type = BINARY_EVENT self.packet_type = BINARY_EVENT
@ -51,7 +51,7 @@ class Packet:
""" """
encoded_packet = str(self.packet_type) encoded_packet = str(self.packet_type)
if self.packet_type == BINARY_EVENT or self.packet_type == BINARY_ACK: if self.packet_type == BINARY_EVENT or self.packet_type == BINARY_ACK:
data, attachments = self._deconstruct_binary(self.data) data, attachments = self.deconstruct_binary(self.data)
encoded_packet += str(len(attachments)) + '-' encoded_packet += str(len(attachments)) + '-'
else: else:
data = self.data data = self.data
@ -119,61 +119,65 @@ class Packet:
raise ValueError('Unexpected binary attachment') raise ValueError('Unexpected binary attachment')
self.attachments.append(attachment) self.attachments.append(attachment)
if self.attachment_count == len(self.attachments): if self.attachment_count == len(self.attachments):
self.reconstruct_binary(self.attachments) self.data = self.reconstruct_binary(self.data, self.attachments)
return True return True
return False return False
def reconstruct_binary(self, attachments): @classmethod
def reconstruct_binary(cls, data, attachments):
"""Reconstruct a decoded packet using the given list of binary """Reconstruct a decoded packet using the given list of binary
attachments. attachments.
""" """
self.data = self._reconstruct_binary_internal(self.data, return cls._reconstruct_binary_internal(data, attachments)
self.attachments)
def _reconstruct_binary_internal(self, data, attachments): @classmethod
def _reconstruct_binary_internal(cls, data, attachments):
if isinstance(data, list): if isinstance(data, list):
return [self._reconstruct_binary_internal(item, attachments) return [cls._reconstruct_binary_internal(item, attachments)
for item in data] for item in data]
elif isinstance(data, dict): elif isinstance(data, dict):
if data.get('_placeholder') and 'num' in data: if data.get('_placeholder') and 'num' in data:
return attachments[data['num']] return attachments[data['num']]
else: else:
return {key: self._reconstruct_binary_internal(value, return {key: cls._reconstruct_binary_internal(value,
attachments) attachments)
for key, value in data.items()} for key, value in data.items()}
else: else:
return data return data
def _deconstruct_binary(self, data): @classmethod
def deconstruct_binary(cls, data):
"""Extract binary components in the packet.""" """Extract binary components in the packet."""
attachments = [] attachments = []
data = self._deconstruct_binary_internal(data, attachments) data = cls._deconstruct_binary_internal(data, attachments)
return data, attachments return data, attachments
def _deconstruct_binary_internal(self, data, attachments): @classmethod
def _deconstruct_binary_internal(cls, data, attachments):
if isinstance(data, bytes): if isinstance(data, bytes):
attachments.append(data) attachments.append(data)
return {'_placeholder': True, 'num': len(attachments) - 1} return {'_placeholder': True, 'num': len(attachments) - 1}
elif isinstance(data, list): elif isinstance(data, list):
return [self._deconstruct_binary_internal(item, attachments) return [cls._deconstruct_binary_internal(item, attachments)
for item in data] for item in data]
elif isinstance(data, dict): elif isinstance(data, dict):
return {key: self._deconstruct_binary_internal(value, attachments) return {key: cls._deconstruct_binary_internal(value, attachments)
for key, value in data.items()} for key, value in data.items()}
else: else:
return data return data
def _data_is_binary(self, data): @classmethod
def data_is_binary(cls, data):
"""Check if the data contains binary components.""" """Check if the data contains binary components."""
if isinstance(data, bytes): if isinstance(data, bytes):
return True return True
elif isinstance(data, list): elif isinstance(data, list):
return functools.reduce( return functools.reduce(
lambda a, b: a or b, [self._data_is_binary(item) lambda a, b: a or b, [cls.data_is_binary(item)
for item in data], False) for item in data], False)
elif isinstance(data, dict): elif isinstance(data, dict):
return functools.reduce( return functools.reduce(
lambda a, b: a or b, [self._data_is_binary(item) lambda a, b: a or b, [cls.data_is_binary(item)
for item in data.values()], for item in data.values()],
False) False)
else: else:

14
src/socketio/pubsub_manager.py

@ -1,9 +1,11 @@
import base64
from functools import partial from functools import partial
import uuid import uuid
from engineio import json from engineio import json
from .manager import Manager from .manager import Manager
from .packet import Packet
class PubSubManager(Manager): class PubSubManager(Manager):
@ -61,8 +63,12 @@ class PubSubManager(Manager):
callback = (room, namespace, id) callback = (room, namespace, id)
else: else:
callback = None callback = None
binary = Packet.data_is_binary(data)
if binary:
data, attachments = Packet.deconstruct_binary(data)
data = [data, *[base64.b64encode(a).decode() for a in attachments]]
message = {'method': 'emit', 'event': event, 'data': data, message = {'method': 'emit', 'event': event, 'data': data,
'namespace': namespace, 'room': room, 'binary': binary, 'namespace': namespace, 'room': room,
'skip_sid': skip_sid, 'callback': callback, 'skip_sid': skip_sid, 'callback': callback,
'host_id': self.host_id} 'host_id': self.host_id}
self._handle_emit(message) # handle in this host self._handle_emit(message) # handle in this host
@ -141,7 +147,11 @@ class PubSubManager(Manager):
*remote_callback) *remote_callback)
else: else:
callback = None callback = None
super().emit(message['event'], message['data'], data = message['data']
if message.get('binary'):
attachments = [base64.b64decode(a) for a in data[1:]]
data = Packet.reconstruct_binary(data[0], attachments)
super().emit(message['event'], data,
namespace=message.get('namespace'), namespace=message.get('namespace'),
room=message.get('room'), room=message.get('room'),
skip_sid=message.get('skip_sid'), callback=callback) skip_sid=message.get('skip_sid'), callback=callback)

67
tests/async/test_pubsub_manager.py

@ -57,6 +57,7 @@ class TestAsyncPubSubManager:
{ {
'method': 'emit', 'method': 'emit',
'event': 'foo', 'event': 'foo',
'binary': False,
'data': 'bar', 'data': 'bar',
'namespace': '/', 'namespace': '/',
'room': None, 'room': None,
@ -66,6 +67,36 @@ class TestAsyncPubSubManager:
} }
) )
async def test_emit_binary(self):
await self.pm.emit('foo', b'bar')
self.pm._publish.assert_awaited_once_with(
{
'method': 'emit',
'event': 'foo',
'binary': True,
'data': [{'_placeholder': True, 'num': 0}, 'YmFy'],
'namespace': '/',
'room': None,
'skip_sid': None,
'callback': None,
'host_id': '123456',
}
)
await self.pm.emit('foo', {'foo': b'bar'})
self.pm._publish.assert_awaited_with(
{
'method': 'emit',
'event': 'foo',
'binary': True,
'data': [{'foo': {'_placeholder': True, 'num': 0}}, 'YmFy'],
'namespace': '/',
'room': None,
'skip_sid': None,
'callback': None,
'host_id': '123456',
}
)
async def test_emit_with_to(self): async def test_emit_with_to(self):
sid = 'room-mate' sid = 'room-mate'
await self.pm.emit('foo', 'bar', to=sid) await self.pm.emit('foo', 'bar', to=sid)
@ -73,6 +104,7 @@ class TestAsyncPubSubManager:
{ {
'method': 'emit', 'method': 'emit',
'event': 'foo', 'event': 'foo',
'binary': False,
'data': 'bar', 'data': 'bar',
'namespace': '/', 'namespace': '/',
'room': sid, 'room': sid,
@ -88,6 +120,7 @@ class TestAsyncPubSubManager:
{ {
'method': 'emit', 'method': 'emit',
'event': 'foo', 'event': 'foo',
'binary': False,
'data': 'bar', 'data': 'bar',
'namespace': '/baz', 'namespace': '/baz',
'room': None, 'room': None,
@ -103,6 +136,7 @@ class TestAsyncPubSubManager:
{ {
'method': 'emit', 'method': 'emit',
'event': 'foo', 'event': 'foo',
'binary': False,
'data': 'bar', 'data': 'bar',
'namespace': '/', 'namespace': '/',
'room': 'baz', 'room': 'baz',
@ -118,6 +152,7 @@ class TestAsyncPubSubManager:
{ {
'method': 'emit', 'method': 'emit',
'event': 'foo', 'event': 'foo',
'binary': False,
'data': 'bar', 'data': 'bar',
'namespace': '/', 'namespace': '/',
'room': None, 'room': None,
@ -136,6 +171,7 @@ class TestAsyncPubSubManager:
{ {
'method': 'emit', 'method': 'emit',
'event': 'foo', 'event': 'foo',
'binary': False,
'data': 'bar', 'data': 'bar',
'namespace': '/', 'namespace': '/',
'room': 'baz', 'room': 'baz',
@ -241,6 +277,37 @@ class TestAsyncPubSubManager:
callback=None, callback=None,
) )
async def test_handle_emit_binary(self):
with mock.patch.object(
async_manager.AsyncManager, 'emit'
) as super_emit:
await self.pm._handle_emit({
'event': 'foo',
'binary': True,
'data': [{'_placeholder': True, 'num': 0}, 'YmFy'],
})
super_emit.assert_awaited_once_with(
'foo',
b'bar',
namespace=None,
room=None,
skip_sid=None,
callback=None,
)
await self.pm._handle_emit({
'event': 'foo',
'binary': True,
'data': [{'foo': {'_placeholder': True, 'num': 0}}, 'YmFy'],
})
super_emit.assert_awaited_with(
'foo',
{'foo': b'bar'},
namespace=None,
room=None,
skip_sid=None,
callback=None,
)
async def test_handle_emit_with_namespace(self): async def test_handle_emit_with_namespace(self):
with mock.patch.object( with mock.patch.object(
async_manager.AsyncManager, 'emit' async_manager.AsyncManager, 'emit'

24
tests/common/test_packet.py

@ -266,16 +266,24 @@ class TestPacket:
assert pkt.data["a"] == "0123456789-" assert pkt.data["a"] == "0123456789-"
assert pkt.attachment_count == 0 assert pkt.attachment_count == 0
def test_deconstruct_binary(self):
datas = [b'foo', [b'foo', b'bar'], ['foo', b'bar'], {'foo': b'bar'},
{'foo': 'bar', 'baz': b'qux'}, {'foo': [b'bar']}]
for data in datas:
bdata, attachments = packet.Packet.deconstruct_binary(data)
rdata = packet.Packet.reconstruct_binary(bdata, attachments)
assert data == rdata
def test_data_is_binary_list(self): def test_data_is_binary_list(self):
pkt = packet.Packet() pkt = packet.Packet()
assert not pkt._data_is_binary(['foo']) assert not pkt.data_is_binary(['foo'])
assert not pkt._data_is_binary([]) assert not pkt.data_is_binary([])
assert pkt._data_is_binary([b'foo']) assert pkt.data_is_binary([b'foo'])
assert pkt._data_is_binary(['foo', b'bar']) assert pkt.data_is_binary(['foo', b'bar'])
def test_data_is_binary_dict(self): def test_data_is_binary_dict(self):
pkt = packet.Packet() pkt = packet.Packet()
assert not pkt._data_is_binary({'a': 'foo'}) assert not pkt.data_is_binary({'a': 'foo'})
assert not pkt._data_is_binary({}) assert not pkt.data_is_binary({})
assert pkt._data_is_binary({'a': b'foo'}) assert pkt.data_is_binary({'a': b'foo'})
assert pkt._data_is_binary({'a': 'foo', 'b': b'bar'}) assert pkt.data_is_binary({'a': 'foo', 'b': b'bar'})

65
tests/common/test_pubsub_manager.py

@ -69,6 +69,7 @@ class TestPubSubManager:
{ {
'method': 'emit', 'method': 'emit',
'event': 'foo', 'event': 'foo',
'binary': False,
'data': 'bar', 'data': 'bar',
'namespace': '/', 'namespace': '/',
'room': None, 'room': None,
@ -78,6 +79,36 @@ class TestPubSubManager:
} }
) )
def test_emit_binary(self):
self.pm.emit('foo', b'bar')
self.pm._publish.assert_called_once_with(
{
'method': 'emit',
'event': 'foo',
'binary': True,
'data': [{'_placeholder': True, 'num': 0}, 'YmFy'],
'namespace': '/',
'room': None,
'skip_sid': None,
'callback': None,
'host_id': '123456',
}
)
self.pm.emit('foo', {'foo': b'bar'})
self.pm._publish.assert_called_with(
{
'method': 'emit',
'event': 'foo',
'binary': True,
'data': [{'foo': {'_placeholder': True, 'num': 0}}, 'YmFy'],
'namespace': '/',
'room': None,
'skip_sid': None,
'callback': None,
'host_id': '123456',
}
)
def test_emit_with_to(self): def test_emit_with_to(self):
sid = "ferris" sid = "ferris"
self.pm.emit('foo', 'bar', to=sid) self.pm.emit('foo', 'bar', to=sid)
@ -85,6 +116,7 @@ class TestPubSubManager:
{ {
'method': 'emit', 'method': 'emit',
'event': 'foo', 'event': 'foo',
'binary': False,
'data': 'bar', 'data': 'bar',
'namespace': '/', 'namespace': '/',
'room': sid, 'room': sid,
@ -100,6 +132,7 @@ class TestPubSubManager:
{ {
'method': 'emit', 'method': 'emit',
'event': 'foo', 'event': 'foo',
'binary': False,
'data': 'bar', 'data': 'bar',
'namespace': '/baz', 'namespace': '/baz',
'room': None, 'room': None,
@ -115,6 +148,7 @@ class TestPubSubManager:
{ {
'method': 'emit', 'method': 'emit',
'event': 'foo', 'event': 'foo',
'binary': False,
'data': 'bar', 'data': 'bar',
'namespace': '/', 'namespace': '/',
'room': 'baz', 'room': 'baz',
@ -130,6 +164,7 @@ class TestPubSubManager:
{ {
'method': 'emit', 'method': 'emit',
'event': 'foo', 'event': 'foo',
'binary': False,
'data': 'bar', 'data': 'bar',
'namespace': '/', 'namespace': '/',
'room': None, 'room': None,
@ -148,6 +183,7 @@ class TestPubSubManager:
{ {
'method': 'emit', 'method': 'emit',
'event': 'foo', 'event': 'foo',
'binary': False,
'data': 'bar', 'data': 'bar',
'namespace': '/', 'namespace': '/',
'room': 'baz', 'room': 'baz',
@ -250,6 +286,35 @@ class TestPubSubManager:
callback=None, callback=None,
) )
def test_handle_emit_binary(self):
with mock.patch.object(manager.Manager, 'emit') as super_emit:
self.pm._handle_emit({
'event': 'foo',
'binary': True,
'data': [{'_placeholder': True, 'num': 0}, 'YmFy'],
})
super_emit.assert_called_once_with(
'foo',
b'bar',
namespace=None,
room=None,
skip_sid=None,
callback=None,
)
self.pm._handle_emit({
'event': 'foo',
'binary': True,
'data': [{'foo': {'_placeholder': True, 'num': 0}}, 'YmFy'],
})
super_emit.assert_called_with(
'foo',
{'foo': b'bar'},
namespace=None,
room=None,
skip_sid=None,
callback=None,
)
def test_handle_emit_with_namespace(self): def test_handle_emit_with_namespace(self):
with mock.patch.object(manager.Manager, 'emit') as super_emit: with mock.patch.object(manager.Manager, 'emit') as super_emit:
self.pm._handle_emit( self.pm._handle_emit(

Loading…
Cancel
Save