Browse Source

Optimize memory usage during broadcasts

pull/1233/head
Miguel Grinberg 2 years ago
parent
commit
7c1569031f
Failed to extract signature
  1. 2
      setup.cfg
  2. 47
      src/socketio/asyncio_manager.py
  3. 17
      src/socketio/asyncio_server.py
  4. 41
      src/socketio/base_manager.py
  5. 21
      src/socketio/server.py
  6. 164
      tests/asyncio/test_asyncio_manager.py
  7. 13
      tests/asyncio/test_asyncio_pubsub_manager.py
  8. 83
      tests/asyncio/test_asyncio_server.py
  9. 121
      tests/common/test_base_manager.py
  10. 9
      tests/common/test_pubsub_manager.py
  11. 62
      tests/common/test_server.py
  12. 1
      tests/performance/run.sh
  13. 3
      tests/performance/server_send.py
  14. 30
      tests/performance/server_send_broadcast.py

2
setup.cfg

@ -25,7 +25,7 @@ packages = find:
python_requires = >=3.6
install_requires =
bidict >= 0.21.0
python-engineio >= 4.3.0
python-engineio >= 4.7.0
[options.packages.find]
where = src

47
src/socketio/asyncio_manager.py

@ -1,5 +1,7 @@
import asyncio
from engineio import packet as eio_packet
from socketio import packet
from .base_manager import BaseManager
@ -17,18 +19,45 @@ class AsyncManager(BaseManager):
"""
if namespace not in self.rooms:
return
tasks = []
if isinstance(data, tuple):
# tuples are expanded to multiple arguments, everything else is
# sent as a single argument
data = list(data)
elif data is not None:
data = [data]
else:
data = []
if not isinstance(skip_sid, list):
skip_sid = [skip_sid]
for sid, eio_sid in self.get_participants(namespace, room):
if sid not in skip_sid:
if callback is not None:
tasks = []
if not callback:
# when callbacks aren't used the packets sent to each recipient are
# identical, so they can be generated once and reused
pkt = self.server.packet_class(
packet.EVENT, namespace=namespace, data=[event] + data)
encoded_packet = pkt.encode()
if not isinstance(encoded_packet, list):
encoded_packet = [encoded_packet]
eio_pkt = [eio_packet.Packet(eio_packet.MESSAGE, p)
for p in encoded_packet]
for sid, eio_sid in self.get_participants(namespace, room):
if sid not in skip_sid:
for p in eio_pkt:
tasks.append(asyncio.create_task(
self.server._send_eio_packet(eio_sid, p)))
else:
# callbacks are used, so each recipient must be sent a packet that
# contains a unique callback id
# note that callbacks when addressing a group of people are
# implemented but not tested or supported
for sid, eio_sid in self.get_participants(namespace, room):
if sid not in skip_sid: # pragma: no branch
id = self._generate_ack_id(sid, callback)
else:
id = None
tasks.append(asyncio.create_task(
self.server._emit_internal(eio_sid, event, data,
namespace, id)))
pkt = self.server.packet_class(
packet.EVENT, namespace=namespace, data=[event] + data,
id=id)
tasks.append(asyncio.create_task(
self.server._send_packet(eio_sid, pkt)))
if tasks == []: # pragma: no cover
return
await asyncio.wait(tasks)

17
src/socketio/asyncio_server.py

@ -424,19 +424,6 @@ class AsyncServer(server.Server):
"""
return await self.eio.sleep(seconds)
async def _emit_internal(self, sid, event, data, namespace=None, id=None):
"""Send a message to a client."""
# tuples are expanded to multiple arguments, everything else is sent
# as a single argument
if isinstance(data, tuple):
data = list(data)
elif data is not None:
data = [data]
else:
data = []
await self._send_packet(sid, self.packet_class(
packet.EVENT, namespace=namespace, data=[event] + data, id=id))
async def _send_packet(self, eio_sid, pkt):
"""Send a Socket.IO packet to a client."""
encoded_packet = pkt.encode()
@ -446,6 +433,10 @@ class AsyncServer(server.Server):
else:
await self.eio.send(eio_sid, encoded_packet)
async def _send_eio_packet(self, eio_sid, eio_pkt):
"""Send a raw Engine.IO packet to a client."""
await self.eio.send_packet(eio_sid, eio_pkt)
async def _handle_connect(self, eio_sid, namespace, data):
"""Handle a client connection request."""
namespace = namespace or '/'

41
src/socketio/base_manager.py

@ -2,6 +2,8 @@ import itertools
import logging
from bidict import bidict, ValueDuplicationError
from engineio import packet as eio_packet
from socketio import packet
default_logger = logging.getLogger('socketio')
@ -161,15 +163,42 @@ class BaseManager(object):
connected to the namespace."""
if namespace not in self.rooms:
return
if isinstance(data, tuple):
# tuples are expanded to multiple arguments, everything else is
# sent as a single argument
data = list(data)
elif data is not None:
data = [data]
else:
data = []
if not isinstance(skip_sid, list):
skip_sid = [skip_sid]
for sid, eio_sid in self.get_participants(namespace, room):
if sid not in skip_sid:
if callback is not None:
if not callback:
# when callbacks aren't used the packets sent to each recipient are
# identical, so they can be generated once and reused
pkt = self.server.packet_class(
packet.EVENT, namespace=namespace, data=[event] + data)
encoded_packet = pkt.encode()
if not isinstance(encoded_packet, list):
encoded_packet = [encoded_packet]
eio_pkt = [eio_packet.Packet(eio_packet.MESSAGE, p)
for p in encoded_packet]
for sid, eio_sid in self.get_participants(namespace, room):
if sid not in skip_sid:
for p in eio_pkt:
self.server._send_eio_packet(eio_sid, p)
else:
# callbacks are used, so each recipient must be sent a packet that
# contains a unique callback id
# note that callbacks when addressing a group of people are
# implemented but not tested or supported
for sid, eio_sid in self.get_participants(namespace, room):
if sid not in skip_sid: # pragma: no branch
id = self._generate_ack_id(sid, callback)
else:
id = None
self.server._emit_internal(eio_sid, event, data, namespace, id)
pkt = self.server.packet_class(
packet.EVENT, namespace=namespace, data=[event] + data,
id=id)
self.server._send_packet(eio_sid, pkt)
def trigger_callback(self, sid, id, data):
"""Invoke an application callback."""

21
src/socketio/server.py

@ -185,7 +185,7 @@ class Server(object):
Example usage::
# as a decorator:
@socket_io.on('connect', namespace='/chat')
@sio.on('connect', namespace='/chat')
def connect_handler(sid, environ):
print('Connection request')
if environ['REMOTE_ADDR'] in blacklisted:
@ -194,7 +194,7 @@ class Server(object):
# as a method:
def message_handler(sid, msg):
print('Received message: ', msg)
eio.send(sid, 'response')
sio.send(sid, 'response')
socket_io.on('message', namespace='/chat', handler=message_handler)
The handler function receives the ``sid`` (session ID) for the
@ -633,19 +633,6 @@ class Server(object):
"""
return self.eio.sleep(seconds)
def _emit_internal(self, eio_sid, event, data, namespace=None, id=None):
"""Send a message to a client."""
# tuples are expanded to multiple arguments, everything else is sent
# as a single argument
if isinstance(data, tuple):
data = list(data)
elif data is not None:
data = [data]
else:
data = []
self._send_packet(eio_sid, self.packet_class(
packet.EVENT, namespace=namespace, data=[event] + data, id=id))
def _send_packet(self, eio_sid, pkt):
"""Send a Socket.IO packet to a client."""
encoded_packet = pkt.encode()
@ -655,6 +642,10 @@ class Server(object):
else:
self.eio.send(eio_sid, encoded_packet)
def _send_eio_packet(self, eio_sid, eio_pkt):
"""Send a raw Engine.IO packet to a client."""
self.eio.send_packet(eio_sid, eio_pkt)
def _handle_connect(self, eio_sid, namespace, data):
"""Handle a client connection request."""
namespace = namespace or '/'

164
tests/asyncio/test_asyncio_manager.py

@ -4,6 +4,7 @@ import unittest
from unittest import mock
from socketio import asyncio_manager
from socketio import packet
def AsyncMock(*args, **kwargs):
@ -33,8 +34,10 @@ class TestAsyncManager(unittest.TestCase):
return str(id)
mock_server = mock.MagicMock()
mock_server._emit_internal = AsyncMock()
mock_server._send_packet = AsyncMock()
mock_server._send_eio_packet = AsyncMock()
mock_server.eio.generate_id = generate_id
mock_server.packet_class = packet.Packet
self.bm = asyncio_manager.AsyncManager()
self.bm.set_server(mock_server)
self.bm.initialize()
@ -221,9 +224,11 @@ class TestAsyncManager(unittest.TestCase):
'my event', {'foo': 'bar'}, namespace='/foo', room=sid
)
)
self.bm.server._emit_internal.mock.assert_called_once_with(
'123', 'my event', {'foo': 'bar'}, '/foo', None
)
assert self.bm.server._send_eio_packet.mock.call_count == 1
assert self.bm.server._send_eio_packet.mock.call_args_list[0][0][0] \
== '123'
pkt = self.bm.server._send_eio_packet.mock.call_args_list[0][0][1]
assert pkt.encode() == '42/foo,["my event",{"foo":"bar"}]'
def test_emit_to_room(self):
sid1 = self.bm.connect('123', '/foo')
@ -236,13 +241,15 @@ class TestAsyncManager(unittest.TestCase):
'my event', {'foo': 'bar'}, namespace='/foo', room='bar'
)
)
assert self.bm.server._emit_internal.mock.call_count == 2
self.bm.server._emit_internal.mock.assert_any_call(
'123', 'my event', {'foo': 'bar'}, '/foo', None
)
self.bm.server._emit_internal.mock.assert_any_call(
'456', 'my event', {'foo': 'bar'}, '/foo', None
)
assert self.bm.server._send_eio_packet.mock.call_count == 2
assert self.bm.server._send_eio_packet.mock.call_args_list[0][0][0] \
== '123'
assert self.bm.server._send_eio_packet.mock.call_args_list[1][0][0] \
== '456'
pkt = self.bm.server._send_eio_packet.mock.call_args_list[0][0][1]
assert self.bm.server._send_eio_packet.mock.call_args_list[1][0][1] \
== pkt
assert pkt.encode() == '42/foo,["my event",{"foo":"bar"}]'
def test_emit_to_rooms(self):
sid1 = self.bm.connect('123', '/foo')
@ -256,16 +263,19 @@ class TestAsyncManager(unittest.TestCase):
self.bm.emit('my event', {'foo': 'bar'}, namespace='/foo',
room=['bar', 'baz'])
)
assert self.bm.server._emit_internal.mock.call_count == 3
self.bm.server._emit_internal.mock.assert_any_call(
'123', 'my event', {'foo': 'bar'}, '/foo', None
)
self.bm.server._emit_internal.mock.assert_any_call(
'456', 'my event', {'foo': 'bar'}, '/foo', None
)
self.bm.server._emit_internal.mock.assert_any_call(
'789', 'my event', {'foo': 'bar'}, '/foo', None
)
assert self.bm.server._send_eio_packet.mock.call_count == 3
assert self.bm.server._send_eio_packet.mock.call_args_list[0][0][0] \
== '123'
assert self.bm.server._send_eio_packet.mock.call_args_list[1][0][0] \
== '456'
assert self.bm.server._send_eio_packet.mock.call_args_list[2][0][0] \
== '789'
pkt = self.bm.server._send_eio_packet.mock.call_args_list[0][0][1]
assert self.bm.server._send_eio_packet.mock.call_args_list[1][0][1] \
== pkt
assert self.bm.server._send_eio_packet.mock.call_args_list[2][0][1] \
== pkt
assert pkt.encode() == '42/foo,["my event",{"foo":"bar"}]'
def test_emit_to_all(self):
sid1 = self.bm.connect('123', '/foo')
@ -275,16 +285,19 @@ class TestAsyncManager(unittest.TestCase):
self.bm.connect('789', '/foo')
self.bm.connect('abc', '/bar')
_run(self.bm.emit('my event', {'foo': 'bar'}, namespace='/foo'))
assert self.bm.server._emit_internal.mock.call_count == 3
self.bm.server._emit_internal.mock.assert_any_call(
'123', 'my event', {'foo': 'bar'}, '/foo', None
)
self.bm.server._emit_internal.mock.assert_any_call(
'456', 'my event', {'foo': 'bar'}, '/foo', None
)
self.bm.server._emit_internal.mock.assert_any_call(
'789', 'my event', {'foo': 'bar'}, '/foo', None
)
assert self.bm.server._send_eio_packet.mock.call_count == 3
assert self.bm.server._send_eio_packet.mock.call_args_list[0][0][0] \
== '123'
assert self.bm.server._send_eio_packet.mock.call_args_list[1][0][0] \
== '456'
assert self.bm.server._send_eio_packet.mock.call_args_list[2][0][0] \
== '789'
pkt = self.bm.server._send_eio_packet.mock.call_args_list[0][0][1]
assert self.bm.server._send_eio_packet.mock.call_args_list[1][0][1] \
== pkt
assert self.bm.server._send_eio_packet.mock.call_args_list[2][0][1] \
== pkt
assert pkt.encode() == '42/foo,["my event",{"foo":"bar"}]'
def test_emit_to_all_skip_one(self):
sid1 = self.bm.connect('123', '/foo')
@ -298,13 +311,15 @@ class TestAsyncManager(unittest.TestCase):
'my event', {'foo': 'bar'}, namespace='/foo', skip_sid=sid2
)
)
assert self.bm.server._emit_internal.mock.call_count == 2
self.bm.server._emit_internal.mock.assert_any_call(
'123', 'my event', {'foo': 'bar'}, '/foo', None
)
self.bm.server._emit_internal.mock.assert_any_call(
'789', 'my event', {'foo': 'bar'}, '/foo', None
)
assert self.bm.server._send_eio_packet.mock.call_count == 2
assert self.bm.server._send_eio_packet.mock.call_args_list[0][0][0] \
== '123'
assert self.bm.server._send_eio_packet.mock.call_args_list[1][0][0] \
== '789'
pkt = self.bm.server._send_eio_packet.mock.call_args_list[0][0][1]
assert self.bm.server._send_eio_packet.mock.call_args_list[1][0][1] \
== pkt
assert pkt.encode() == '42/foo,["my event",{"foo":"bar"}]'
def test_emit_to_all_skip_two(self):
sid1 = self.bm.connect('123', '/foo')
@ -321,10 +336,11 @@ class TestAsyncManager(unittest.TestCase):
skip_sid=[sid1, sid3],
)
)
assert self.bm.server._emit_internal.mock.call_count == 1
self.bm.server._emit_internal.mock.assert_any_call(
'456', 'my event', {'foo': 'bar'}, '/foo', None
)
assert self.bm.server._send_eio_packet.mock.call_count == 1
assert self.bm.server._send_eio_packet.mock.call_args_list[0][0][0] \
== '456'
pkt = self.bm.server._send_eio_packet.mock.call_args_list[0][0][1]
assert pkt.encode() == '42/foo,["my event",{"foo":"bar"}]'
def test_emit_with_callback(self):
sid = self.bm.connect('123', '/foo')
@ -336,9 +352,11 @@ class TestAsyncManager(unittest.TestCase):
)
)
self.bm._generate_ack_id.assert_called_once_with(sid, 'cb')
self.bm.server._emit_internal.mock.assert_called_once_with(
'123', 'my event', {'foo': 'bar'}, '/foo', 11
)
assert self.bm.server._send_packet.mock.call_count == 1
assert self.bm.server._send_packet.mock.call_args_list[0][0][0] \
== '123'
pkt = self.bm.server._send_packet.mock.call_args_list[0][0][1]
assert pkt.encode() == '2/foo,11["my event",{"foo":"bar"}]'
def test_emit_to_invalid_room(self):
_run(
@ -347,3 +365,59 @@ class TestAsyncManager(unittest.TestCase):
def test_emit_to_invalid_namespace(self):
_run(self.bm.emit('my event', {'foo': 'bar'}, namespace='/foo'))
def test_emit_with_tuple(self):
sid = self.bm.connect('123', '/foo')
_run(
self.bm.emit(
'my event', ('foo', 'bar'), namespace='/foo', room=sid
)
)
assert self.bm.server._send_eio_packet.mock.call_count == 1
assert self.bm.server._send_eio_packet.mock.call_args_list[0][0][0] \
== '123'
pkt = self.bm.server._send_eio_packet.mock.call_args_list[0][0][1]
assert pkt.encode() == '42/foo,["my event","foo","bar"]'
def test_emit_with_list(self):
sid = self.bm.connect('123', '/foo')
_run(
self.bm.emit(
'my event', ['foo', 'bar'], namespace='/foo', room=sid
)
)
assert self.bm.server._send_eio_packet.mock.call_count == 1
assert self.bm.server._send_eio_packet.mock.call_args_list[0][0][0] \
== '123'
pkt = self.bm.server._send_eio_packet.mock.call_args_list[0][0][1]
assert pkt.encode() == '42/foo,["my event",["foo","bar"]]'
def test_emit_with_none(self):
sid = self.bm.connect('123', '/foo')
_run(
self.bm.emit(
'my event', None, namespace='/foo', room=sid
)
)
assert self.bm.server._send_eio_packet.mock.call_count == 1
assert self.bm.server._send_eio_packet.mock.call_args_list[0][0][0] \
== '123'
pkt = self.bm.server._send_eio_packet.mock.call_args_list[0][0][1]
assert pkt.encode() == '42/foo,["my event"]'
def test_emit_binary(self):
sid = self.bm.connect('123', '/')
_run(
self.bm.emit(
u'my event', b'my binary data', namespace='/', room=sid
)
)
assert self.bm.server._send_eio_packet.mock.call_count == 2
assert self.bm.server._send_eio_packet.mock.call_args_list[0][0][0] \
== '123'
pkt = self.bm.server._send_eio_packet.mock.call_args_list[0][0][1]
assert pkt.encode() == '451-["my event",{"_placeholder":true,"num":0}]'
assert self.bm.server._send_eio_packet.mock.call_args_list[1][0][0] \
== '123'
pkt = self.bm.server._send_eio_packet.mock.call_args_list[1][0][1]
assert pkt.encode() == b'my binary data'

13
tests/asyncio/test_asyncio_pubsub_manager.py

@ -8,6 +8,7 @@ import pytest
from socketio import asyncio_manager
from socketio import asyncio_pubsub_manager
from socketio import packet
def AsyncMock(*args, **kwargs):
@ -38,7 +39,9 @@ class TestAsyncPubSubManager(unittest.TestCase):
mock_server = mock.MagicMock()
mock_server.eio.generate_id = generate_id
mock_server._emit_internal = AsyncMock()
mock_server.packet_class = packet.Packet
mock_server._send_packet = AsyncMock()
mock_server._send_eio_packet = AsyncMock()
mock_server.disconnect = AsyncMock()
self.pm = asyncio_pubsub_manager.AsyncPubSubManager()
self.pm._publish = AsyncMock()
@ -164,9 +167,11 @@ class TestAsyncPubSubManager(unittest.TestCase):
)
)
self.pm._publish.mock.assert_not_called()
self.pm.server._emit_internal.mock.assert_called_once_with(
'123', 'foo', 'bar', '/', None
)
assert self.pm.server._send_eio_packet.mock.call_count == 1
assert self.pm.server._send_eio_packet.mock.call_args_list[0][0][0] \
== '123'
pkt = self.pm.server._send_eio_packet.mock.call_args_list[0][0][1]
assert pkt.encode() == '42["foo","bar"]'
def test_can_disconnect(self):
sid = self.pm.connect('123', '/')

83
tests/asyncio/test_asyncio_server.py

@ -5,6 +5,7 @@ import unittest
from unittest import mock
from engineio import json
from engineio import packet as eio_packet
import pytest
from socketio import asyncio_server
@ -32,7 +33,8 @@ def _run(coro):
@unittest.skipIf(sys.version_info < (3, 5), 'only for Python 3.5+')
@mock.patch('socketio.server.engineio.AsyncServer', **{
'return_value.generate_id.side_effect': [str(i) for i in range(1, 10)]})
'return_value.generate_id.side_effect': [str(i) for i in range(1, 10)],
'return_value.send_packet': AsyncMock()})
class TestAsyncServer(unittest.TestCase):
def tearDown(self):
# restore JSON encoder, in case a test changed it
@ -295,72 +297,24 @@ class TestAsyncServer(unittest.TestCase):
_run(s.handle_request('environ'))
s.eio.handle_request.mock.assert_called_once_with('environ')
def test_emit_internal(self, eio):
def test_send_packet(self, eio):
eio.return_value.send = AsyncMock()
s = asyncio_server.AsyncServer()
_run(s._emit_internal('123', 'my event', 'my data', namespace='/foo'))
_run(s._send_packet('123', packet.Packet(
packet.EVENT, ['my event', 'my data'], namespace='/foo')))
s.eio.send.mock.assert_called_once_with(
'123', '2/foo,["my event","my data"]'
)
def test_emit_internal_with_tuple(self, eio):
def test_send_eio_packet(self, eio):
eio.return_value.send = AsyncMock()
s = asyncio_server.AsyncServer()
_run(
s._emit_internal(
'123', 'my event', ('foo', 'bar'), namespace='/foo'
)
)
s.eio.send.mock.assert_called_once_with(
'123', '2/foo,["my event","foo","bar"]'
)
def test_emit_internal_with_list(self, eio):
eio.return_value.send = AsyncMock()
s = asyncio_server.AsyncServer()
_run(
s._emit_internal(
'123', 'my event', ['foo', 'bar'], namespace='/foo'
)
)
s.eio.send.mock.assert_called_once_with(
'123', '2/foo,["my event",["foo","bar"]]'
)
def test_emit_internal_with_none(self, eio):
eio.return_value.send = AsyncMock()
s = asyncio_server.AsyncServer()
_run(s._emit_internal('123', 'my event', None, namespace='/foo'))
s.eio.send.mock.assert_called_once_with(
'123', '2/foo,["my event"]'
)
def test_emit_internal_with_callback(self, eio):
eio.return_value.send = AsyncMock()
s = asyncio_server.AsyncServer()
id = s.manager._generate_ack_id('1', 'cb')
_run(
s._emit_internal(
'123', 'my event', 'my data', namespace='/foo', id=id
)
)
s.eio.send.mock.assert_called_once_with(
'123', '2/foo,1["my event","my data"]'
)
def test_emit_internal_default_namespace(self, eio):
eio.return_value.send = AsyncMock()
s = asyncio_server.AsyncServer()
_run(s._emit_internal('123', 'my event', 'my data'))
s.eio.send.mock.assert_called_once_with(
'123', '2["my event","my data"]'
)
def test_emit_internal_binary(self, eio):
eio.return_value.send = AsyncMock()
s = asyncio_server.AsyncServer()
_run(s._emit_internal('123', u'my event', b'my binary data'))
assert s.eio.send.mock.call_count == 2
_run(s._send_eio_packet('123', eio_packet.Packet(
eio_packet.MESSAGE, 'hello')))
assert s.eio.send_packet.mock.call_count == 1
assert s.eio.send_packet.mock.call_args_list[0][0][0] == '123'
pkt = s.eio.send_packet.mock.call_args_list[0][0][1]
assert pkt.encode() == '4hello'
def test_transport(self, eio):
eio.return_value.send = AsyncMock()
@ -804,8 +758,10 @@ class TestAsyncServer(unittest.TestCase):
cb = mock.MagicMock()
id1 = s.manager._generate_ack_id('1', cb)
id2 = s.manager._generate_ack_id('1', cb)
_run(s._emit_internal('123', 'my event', ['foo'], id=id1))
_run(s._emit_internal('123', 'my event', ['bar'], id=id2))
_run(s._send_packet('123', packet.Packet(
packet.EVENT, ['my event', 'foo'], id=id1)))
_run(s._send_packet('123', packet.Packet(
packet.EVENT, ['my event', 'bar'], id=id2)))
_run(s._handle_eio_message('123', '31["foo",2]'))
cb.assert_called_once_with('foo', 2)
@ -818,8 +774,9 @@ class TestAsyncServer(unittest.TestCase):
cb = mock.MagicMock()
id = s.manager._generate_ack_id('1', cb)
_run(
s._emit_internal(
'123', 'my event', ['foo'], namespace='/foo', id=id
s._send_packet(
'123', packet.Packet(packet.EVENT, ['my event', 'foo'],
namespace='/foo', id=id)
)
)
_run(s._handle_eio_message('123', '3/foo,1["foo",2]'))

121
tests/common/test_base_manager.py

@ -4,6 +4,7 @@ from unittest import mock
import pytest
from socketio import base_manager
from socketio import packet
class TestBaseManager(unittest.TestCase):
@ -17,6 +18,7 @@ class TestBaseManager(unittest.TestCase):
mock_server = mock.MagicMock()
mock_server.eio.generate_id = generate_id
mock_server.packet_class = packet.Packet
self.bm = base_manager.BaseManager()
self.bm.set_server(mock_server)
self.bm.initialize()
@ -205,9 +207,10 @@ class TestBaseManager(unittest.TestCase):
sid = self.bm.connect('123', '/foo')
self.bm.connect('456', '/foo')
self.bm.emit('my event', {'foo': 'bar'}, namespace='/foo', room=sid)
self.bm.server._emit_internal.assert_called_once_with(
'123', 'my event', {'foo': 'bar'}, '/foo', None
)
assert self.bm.server._send_eio_packet.call_count == 1
assert self.bm.server._send_eio_packet.call_args_list[0][0][0] == '123'
pkt = self.bm.server._send_eio_packet.call_args_list[0][0][1]
assert pkt.encode() == '42/foo,["my event",{"foo":"bar"}]'
def test_emit_to_room(self):
sid1 = self.bm.connect('123', '/foo')
@ -216,13 +219,12 @@ class TestBaseManager(unittest.TestCase):
self.bm.enter_room(sid2, '/foo', 'bar')
self.bm.connect('789', '/foo')
self.bm.emit('my event', {'foo': 'bar'}, namespace='/foo', room='bar')
assert self.bm.server._emit_internal.call_count == 2
self.bm.server._emit_internal.assert_any_call(
'123', 'my event', {'foo': 'bar'}, '/foo', None
)
self.bm.server._emit_internal.assert_any_call(
'456', 'my event', {'foo': 'bar'}, '/foo', None
)
assert self.bm.server._send_eio_packet.call_count == 2
assert self.bm.server._send_eio_packet.call_args_list[0][0][0] == '123'
assert self.bm.server._send_eio_packet.call_args_list[1][0][0] == '456'
pkt = self.bm.server._send_eio_packet.call_args_list[0][0][1]
assert pkt == self.bm.server._send_eio_packet.call_args_list[1][0][1]
assert pkt.encode() == '42/foo,["my event",{"foo":"bar"}]'
def test_emit_to_rooms(self):
sid1 = self.bm.connect('123', '/foo')
@ -234,16 +236,14 @@ class TestBaseManager(unittest.TestCase):
self.bm.enter_room(sid3, '/foo', 'baz')
self.bm.emit('my event', {'foo': 'bar'}, namespace='/foo',
room=['bar', 'baz'])
assert self.bm.server._emit_internal.call_count == 3
self.bm.server._emit_internal.assert_any_call(
'123', 'my event', {'foo': 'bar'}, '/foo', None
)
self.bm.server._emit_internal.assert_any_call(
'456', 'my event', {'foo': 'bar'}, '/foo', None
)
self.bm.server._emit_internal.assert_any_call(
'789', 'my event', {'foo': 'bar'}, '/foo', None
)
assert self.bm.server._send_eio_packet.call_count == 3
assert self.bm.server._send_eio_packet.call_args_list[0][0][0] == '123'
assert self.bm.server._send_eio_packet.call_args_list[1][0][0] == '456'
assert self.bm.server._send_eio_packet.call_args_list[2][0][0] == '789'
pkt = self.bm.server._send_eio_packet.call_args_list[0][0][1]
assert pkt == self.bm.server._send_eio_packet.call_args_list[1][0][1]
assert pkt == self.bm.server._send_eio_packet.call_args_list[2][0][1]
assert pkt.encode() == '42/foo,["my event",{"foo":"bar"}]'
def test_emit_to_all(self):
sid1 = self.bm.connect('123', '/foo')
@ -253,16 +253,14 @@ class TestBaseManager(unittest.TestCase):
self.bm.connect('789', '/foo')
self.bm.connect('abc', '/bar')
self.bm.emit('my event', {'foo': 'bar'}, namespace='/foo')
assert self.bm.server._emit_internal.call_count == 3
self.bm.server._emit_internal.assert_any_call(
'123', 'my event', {'foo': 'bar'}, '/foo', None
)
self.bm.server._emit_internal.assert_any_call(
'456', 'my event', {'foo': 'bar'}, '/foo', None
)
self.bm.server._emit_internal.assert_any_call(
'789', 'my event', {'foo': 'bar'}, '/foo', None
)
assert self.bm.server._send_eio_packet.call_count == 3
assert self.bm.server._send_eio_packet.call_args_list[0][0][0] == '123'
assert self.bm.server._send_eio_packet.call_args_list[1][0][0] == '456'
assert self.bm.server._send_eio_packet.call_args_list[2][0][0] == '789'
pkt = self.bm.server._send_eio_packet.call_args_list[0][0][1]
assert pkt == self.bm.server._send_eio_packet.call_args_list[1][0][1]
assert pkt == self.bm.server._send_eio_packet.call_args_list[2][0][1]
assert pkt.encode() == '42/foo,["my event",{"foo":"bar"}]'
def test_emit_to_all_skip_one(self):
sid1 = self.bm.connect('123', '/foo')
@ -274,13 +272,12 @@ class TestBaseManager(unittest.TestCase):
self.bm.emit(
'my event', {'foo': 'bar'}, namespace='/foo', skip_sid=sid2
)
assert self.bm.server._emit_internal.call_count == 2
self.bm.server._emit_internal.assert_any_call(
'123', 'my event', {'foo': 'bar'}, '/foo', None
)
self.bm.server._emit_internal.assert_any_call(
'789', 'my event', {'foo': 'bar'}, '/foo', None
)
assert self.bm.server._send_eio_packet.call_count == 2
assert self.bm.server._send_eio_packet.call_args_list[0][0][0] == '123'
assert self.bm.server._send_eio_packet.call_args_list[1][0][0] == '789'
pkt = self.bm.server._send_eio_packet.call_args_list[0][0][1]
assert pkt == self.bm.server._send_eio_packet.call_args_list[1][0][1]
assert pkt.encode() == '42/foo,["my event",{"foo":"bar"}]'
def test_emit_to_all_skip_two(self):
sid1 = self.bm.connect('123', '/foo')
@ -295,10 +292,10 @@ class TestBaseManager(unittest.TestCase):
namespace='/foo',
skip_sid=[sid1, sid3],
)
assert self.bm.server._emit_internal.call_count == 1
self.bm.server._emit_internal.assert_any_call(
'456', 'my event', {'foo': 'bar'}, '/foo', None
)
assert self.bm.server._send_eio_packet.call_count == 1
assert self.bm.server._send_eio_packet.call_args_list[0][0][0] == '456'
pkt = self.bm.server._send_eio_packet.call_args_list[0][0][1]
assert pkt.encode() == '42/foo,["my event",{"foo":"bar"}]'
def test_emit_with_callback(self):
sid = self.bm.connect('123', '/foo')
@ -308,12 +305,48 @@ class TestBaseManager(unittest.TestCase):
'my event', {'foo': 'bar'}, namespace='/foo', callback='cb'
)
self.bm._generate_ack_id.assert_called_once_with(sid, 'cb')
self.bm.server._emit_internal.assert_called_once_with(
'123', 'my event', {'foo': 'bar'}, '/foo', 11
)
assert self.bm.server._send_packet.call_count == 1
assert self.bm.server._send_packet.call_args_list[0][0][0] == '123'
pkt = self.bm.server._send_packet.call_args_list[0][0][1]
assert pkt.encode() == '2/foo,11["my event",{"foo":"bar"}]'
def test_emit_to_invalid_room(self):
self.bm.emit('my event', {'foo': 'bar'}, namespace='/', room='123')
def test_emit_to_invalid_namespace(self):
self.bm.emit('my event', {'foo': 'bar'}, namespace='/foo')
def test_emit_with_tuple(self):
sid = self.bm.connect('123', '/foo')
self.bm.emit('my event', ('foo', 'bar'), namespace='/foo', room=sid)
assert self.bm.server._send_eio_packet.call_count == 1
assert self.bm.server._send_eio_packet.call_args_list[0][0][0] == '123'
pkt = self.bm.server._send_eio_packet.call_args_list[0][0][1]
assert pkt.encode() == '42/foo,["my event","foo","bar"]'
def test_emit_with_list(self):
sid = self.bm.connect('123', '/foo')
self.bm.emit('my event', ['foo', 'bar'], namespace='/foo', room=sid)
assert self.bm.server._send_eio_packet.call_count == 1
assert self.bm.server._send_eio_packet.call_args_list[0][0][0] == '123'
pkt = self.bm.server._send_eio_packet.call_args_list[0][0][1]
assert pkt.encode() == '42/foo,["my event",["foo","bar"]]'
def test_emit_with_none(self):
sid = self.bm.connect('123', '/foo')
self.bm.emit('my event', None, namespace='/foo', room=sid)
assert self.bm.server._send_eio_packet.call_count == 1
assert self.bm.server._send_eio_packet.call_args_list[0][0][0] == '123'
pkt = self.bm.server._send_eio_packet.call_args_list[0][0][1]
assert pkt.encode() == '42/foo,["my event"]'
def test_emit_binary(self):
sid = self.bm.connect('123', '/')
self.bm.emit(u'my event', b'my binary data', namespace='/', room=sid)
assert self.bm.server._send_eio_packet.call_count == 2
assert self.bm.server._send_eio_packet.call_args_list[0][0][0] == '123'
pkt = self.bm.server._send_eio_packet.call_args_list[0][0][1]
assert pkt.encode() == '451-["my event",{"_placeholder":true,"num":0}]'
assert self.bm.server._send_eio_packet.call_args_list[1][0][0] == '123'
pkt = self.bm.server._send_eio_packet.call_args_list[1][0][1]
assert pkt.encode() == b'my binary data'

9
tests/common/test_pubsub_manager.py

@ -7,6 +7,7 @@ import pytest
from socketio import base_manager
from socketio import pubsub_manager
from socketio import packet
class TestPubSubManager(unittest.TestCase):
@ -20,6 +21,7 @@ class TestPubSubManager(unittest.TestCase):
mock_server = mock.MagicMock()
mock_server.eio.generate_id = generate_id
mock_server.packet_class = packet.Packet
self.pm = pubsub_manager.PubSubManager()
self.pm._publish = mock.MagicMock()
self.pm.set_server(mock_server)
@ -157,9 +159,10 @@ class TestPubSubManager(unittest.TestCase):
'foo', 'bar', room=sid, namespace='/', ignore_queue=True
)
self.pm._publish.assert_not_called()
self.pm.server._emit_internal.assert_called_once_with(
'123', 'foo', 'bar', '/', None
)
assert self.pm.server._send_eio_packet.call_count == 1
assert self.pm.server._send_eio_packet.call_args_list[0][0][0] == '123'
pkt = self.pm.server._send_eio_packet.call_args_list[0][0][1]
assert pkt.encode() == '42["foo","bar"]'
def test_can_disconnect(self):
sid = self.pm.connect('123', '/')

62
tests/common/test_server.py

@ -3,6 +3,7 @@ import unittest
from unittest import mock
from engineio import json
from engineio import packet as eio_packet
import pytest
from socketio import exceptions
@ -271,53 +272,22 @@ class TestServer(unittest.TestCase):
'environ', 'start_response'
)
def test_emit_internal(self, eio):
def test_send_packet(self, eio):
s = server.Server()
s._emit_internal('123', 'my event', 'my data', namespace='/foo')
s._send_packet('123', packet.Packet(
packet.EVENT, ['my event', 'my data'], namespace='/foo'))
s.eio.send.assert_called_once_with(
'123', '2/foo,["my event","my data"]'
)
def test_emit_internal_with_tuple(self, eio):
def test_send_eio_packet(self, eio):
s = server.Server()
s._emit_internal('123', 'my event', ('foo', 'bar'), namespace='/foo')
s.eio.send.assert_called_once_with(
'123', '2/foo,["my event","foo","bar"]'
)
def test_emit_internal_with_list(self, eio):
s = server.Server()
s._emit_internal('123', 'my event', ['foo', 'bar'], namespace='/foo')
s.eio.send.assert_called_once_with(
'123', '2/foo,["my event",["foo","bar"]]'
)
def test_emit_internal_with_none(self, eio):
s = server.Server()
s._emit_internal('123', 'my event', None, namespace='/foo')
s.eio.send.assert_called_once_with(
'123', '2/foo,["my event"]'
)
def test_emit_internal_with_callback(self, eio):
s = server.Server()
id = s.manager._generate_ack_id('1', 'cb')
s._emit_internal('123', 'my event', 'my data', namespace='/foo', id=id)
s.eio.send.assert_called_once_with(
'123', '2/foo,1["my event","my data"]'
)
def test_emit_internal_default_namespace(self, eio):
s = server.Server()
s._emit_internal('123', 'my event', 'my data')
s.eio.send.assert_called_once_with(
'123', '2["my event","my data"]'
)
def test_emit_internal_binary(self, eio):
s = server.Server()
s._emit_internal('123', u'my event', b'my binary data')
assert s.eio.send.call_count == 2
s._send_eio_packet('123', eio_packet.Packet(
eio_packet.MESSAGE, 'hello'))
assert s.eio.send_packet.call_count == 1
assert s.eio.send_packet.call_args_list[0][0][0] == '123'
pkt = s.eio.send_packet.call_args_list[0][0][1]
assert pkt.encode() == '4hello'
def test_transport(self, eio):
s = server.Server()
@ -412,7 +382,6 @@ class TestServer(unittest.TestCase):
assert s.manager.is_connected('1', '/foo')
handler.assert_called_once_with('1', 'environ')
s.eio.send.assert_any_call('123', '0/foo,{"sid":"1"}')
print(s.eio.send.call_args_list)
s.eio.send.assert_any_call('123', '4/foo,"Unable to connect"')
def test_handle_connect_always_connect(self, eio):
@ -714,8 +683,10 @@ class TestServer(unittest.TestCase):
cb = mock.MagicMock()
id1 = s.manager._generate_ack_id('1', cb)
id2 = s.manager._generate_ack_id('1', cb)
s._emit_internal('123', 'my event', ['foo'], id=id1)
s._emit_internal('123', 'my event', ['bar'], id=id2)
s._send_packet('123', packet.Packet(
packet.EVENT, ['my event', 'foo'], id=id1))
s._send_packet('123', packet.Packet(
packet.EVENT, ['my event', 'bar'], id=id2))
s._handle_eio_message('123', '31["foo",2]')
cb.assert_called_once_with('foo', 2)
@ -726,7 +697,8 @@ class TestServer(unittest.TestCase):
s._handle_eio_message('123', '0/foo,')
cb = mock.MagicMock()
id = s.manager._generate_ack_id('1', cb)
s._emit_internal('123', 'my event', ['foo'], namespace='/foo', id=id)
s._send_packet('123', packet.Packet(
packet.EVENT, ['my event', 'foo'], namespace='/foo', id=id))
s._handle_eio_message('123', '3/foo,1["foo",2]')
cb.assert_called_once_with('foo', 2)

1
tests/performance/run.sh

@ -5,3 +5,4 @@ python json_packet.py
python namespace_packet.py
python server_receive.py
python server_send.py
python server_send_broadcast.py

3
tests/performance/server_send.py

@ -6,6 +6,9 @@ class Server(socketio.Server):
def _send_packet(self, eio_sid, pkt):
pass
def _send_eio_packet(self, eio_sid, eio_pkt):
pass
def test():
s = Server()

30
tests/performance/server_send_broadcast.py

@ -0,0 +1,30 @@
import time
import socketio
class Server(socketio.Server):
def _send_packet(self, eio_sid, pkt):
pass
def _send_eio_packet(self, eio_sid, eio_pkt):
pass
def test():
s = Server()
start = time.time()
count = 0
for i in range(100):
s._handle_eio_connect(str(i), 'environ')
s._handle_eio_message(str(i), '0')
while True:
s.emit('test', 'hello')
count += 1
if time.time() - start >= 5:
break
return count
if __name__ == '__main__':
count = test()
print('server_send:', count, 'packets received.')
Loading…
Cancel
Save