Browse Source

Use configured JSON module in managers (Fixes #1549) (#1553)

* Use configured JSON module in managers (Fixes #1549)

* unit tests
pull/1556/head
Miguel Grinberg 4 months ago
committed by GitHub
parent
commit
6229261ae6
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 20
      src/socketio/async_aiopika_manager.py
  2. 5
      src/socketio/async_client.py
  3. 22
      src/socketio/async_pubsub_manager.py
  4. 20
      src/socketio/async_redis_manager.py
  5. 5
      src/socketio/async_server.py
  6. 3
      src/socketio/base_manager.py
  7. 5
      src/socketio/client.py
  8. 20
      src/socketio/kafka_manager.py
  9. 20
      src/socketio/kombu_manager.py
  10. 22
      src/socketio/pubsub_manager.py
  11. 20
      src/socketio/redis_manager.py
  12. 5
      src/socketio/server.py
  13. 26
      src/socketio/zmq_manager.py
  14. 13
      tests/async/test_pubsub_manager.py
  15. 15
      tests/async/test_redis_manager.py
  16. 13
      tests/common/test_pubsub_manager.py
  17. 15
      tests/common/test_redis_manager.py
  18. 2
      tox.ini

20
src/socketio/async_aiopika_manager.py

@ -1,6 +1,5 @@
import asyncio
from engineio import json
from .async_pubsub_manager import AsyncPubSubManager
try:
@ -32,18 +31,29 @@ class AsyncAioPikaManager(AsyncPubSubManager): # pragma: no cover
in rabbitmq
:param write_only: If set to ``True``, only initialize to emit events. The
default of ``False`` initializes the class for emitting
and receiving.
and receiving. A write-only instance can be used
independently of the server to emit to clients from an
external process.
:param logger: a custom logger to log it. If not given, the server logger
is used.
:param json: An alternative JSON module to use for encoding and decoding
packets. Custom json modules must have ``dumps`` and ``loads``
functions that are compatible with the standard library
versions. This setting is only used when ``write_only`` is set
to ``True``. Otherwise the JSON module configured in the
server is used.
"""
name = 'asyncaiopika'
def __init__(self, url='amqp://guest:guest@localhost:5672//',
channel='socketio', write_only=False, logger=None):
channel='socketio', write_only=False, logger=None, json=None):
if aio_pika is None:
raise RuntimeError('aio_pika package is not installed '
'(Run "pip install aio_pika" in your '
'virtualenv).')
super().__init__(channel=channel, write_only=write_only, logger=logger)
super().__init__(channel=channel, write_only=write_only, logger=logger,
json=json)
self.url = url
self._lock = asyncio.Lock()
self.publisher_connection = None
@ -82,7 +92,7 @@ class AsyncAioPikaManager(AsyncPubSubManager): # pragma: no cover
try:
await self.publisher_exchange.publish(
aio_pika.Message(
body=json.dumps(data).encode(),
body=self.json.dumps(data).encode(),
delivery_mode=aio_pika.DeliveryMode.PERSISTENT
), routing_key='*',
)

5
src/socketio/async_client.py

@ -37,10 +37,11 @@ class AsyncClient(base_client.BaseClient):
use. To disable logging set to ``False``. The default is
``False``. Note that fatal errors are logged even when
``logger`` is ``False``.
:param json: An alternative json module to use for encoding and decoding
:param json: An alternative JSON module to use for encoding and decoding
packets. Custom json modules must have ``dumps`` and ``loads``
functions that are compatible with the standard library
versions.
versions. This is a process-wide setting, all instantiated
servers and clients must use the same JSON module.
:param handle_sigint: Set to ``True`` to automatically handle disconnection
when the process is interrupted, or to ``False`` to
leave interrupt handling to the calling application.

22
src/socketio/async_pubsub_manager.py

@ -3,8 +3,6 @@ import base64
from functools import partial
import uuid
from engineio import json
from .async_manager import AsyncManager
from .packet import Packet
@ -22,15 +20,31 @@ class AsyncPubSubManager(AsyncManager):
:param channel: The channel name on which the server sends and receives
notifications.
:param write_only: If set to ``True``, only initialize to emit events. The
default of ``False`` initializes the class for emitting
and receiving. A write-only instance can be used
independently of the server to emit to clients from an
external process.
:param logger: a custom logger to log it. If not given, the server logger
is used.
:param json: An alternative JSON module to use for encoding and decoding
packets. Custom json modules must have ``dumps`` and ``loads``
functions that are compatible with the standard library
versions. This setting is only used when ``write_only`` is set
to ``True``. Otherwise the JSON module configured in the
server is used.
"""
name = 'asyncpubsub'
def __init__(self, channel='socketio', write_only=False, logger=None):
def __init__(self, channel='socketio', write_only=False, logger=None,
json=None):
super().__init__()
self.channel = channel
self.write_only = write_only
self.host_id = uuid.uuid4().hex
self.logger = logger
if json is not None:
self.json = json
def initialize(self):
super().initialize()
@ -221,7 +235,7 @@ class AsyncPubSubManager(AsyncManager):
data = message
else:
try:
data = json.loads(message)
data = self.json.loads(message)
except:
pass
if data and 'method' in data:

20
src/socketio/async_redis_manager.py

@ -19,7 +19,6 @@ except ImportError: # pragma: no cover
aiovalkey = None
ValkeyError = None
from engineio import json
from .async_pubsub_manager import AsyncPubSubManager
from .redis_manager import parse_redis_sentinel_url
@ -47,18 +46,29 @@ class AsyncRedisManager(AsyncPubSubManager):
notifications. Must be the same in all the servers.
:param write_only: If set to ``True``, only initialize to emit events. The
default of ``False`` initializes the class for emitting
and receiving.
and receiving. A write-only instance can be used
independently of the server to emit to clients from an
external process.
:param logger: a custom logger to log it. If not given, the server logger
is used.
:param json: An alternative JSON module to use for encoding and decoding
packets. Custom json modules must have ``dumps`` and ``loads``
functions that are compatible with the standard library
versions. This setting is only used when ``write_only`` is set
to ``True``. Otherwise the JSON module configured in the
server is used.
:param redis_options: additional keyword arguments to be passed to
``Redis.from_url()`` or ``Sentinel()``.
"""
name = 'aioredis'
def __init__(self, url='redis://localhost:6379/0', channel='socketio',
write_only=False, logger=None, redis_options=None):
write_only=False, logger=None, json=None, redis_options=None):
if aioredis and \
not hasattr(aioredis.Redis, 'from_url'): # pragma: no cover
raise RuntimeError('Version 2 of aioredis package is required.')
super().__init__(channel=channel, write_only=write_only, logger=logger)
super().__init__(channel=channel, write_only=write_only, logger=logger,
json=json)
self.redis_url = url
self.redis_options = redis_options or {}
self.connected = False
@ -117,7 +127,7 @@ class AsyncRedisManager(AsyncPubSubManager):
if not self.connected:
self._redis_connect()
return await self.redis.publish(
self.channel, json.dumps(data))
self.channel, self.json.dumps(data))
except error as exc:
if retries_left > 0:
self._get_logger().error(

5
src/socketio/async_server.py

@ -28,10 +28,11 @@ class AsyncServer(base_server.BaseServer):
:param logger: To enable logging set to ``True`` or pass a logger object to
use. To disable logging set to ``False``. Note that fatal
errors are logged even when ``logger`` is ``False``.
:param json: An alternative json module to use for encoding and decoding
:param json: An alternative JSON module to use for encoding and decoding
packets. Custom json modules must have ``dumps`` and ``loads``
functions that are compatible with the standard library
versions.
versions. This is a process-wide setting, all instantiated
servers and clients must use the same JSON module.
:param async_handlers: If set to ``True``, event handlers for a client are
executed in separate threads. To run handlers for a
client synchronously, set to ``False``. The default

3
src/socketio/base_manager.py

@ -1,5 +1,6 @@
import itertools
import logging
import json
from bidict import bidict, ValueDuplicationError
@ -14,9 +15,11 @@ class BaseManager:
self.eio_to_sid = {}
self.callbacks = {}
self.pending_disconnect = {}
self.json = json
def set_server(self, server):
self.server = server
self.json = self.server.packet_class.json # use the global JSON module
def initialize(self):
"""Invoked before the first request is received. Subclasses can add

5
src/socketio/client.py

@ -39,10 +39,11 @@ class Client(base_client.BaseClient):
of the ``encode()`` and ``decode()`` methods can be
provided. Client and server must use compatible
serializers.
:param json: An alternative json module to use for encoding and decoding
:param json: An alternative JSON module to use for encoding and decoding
packets. Custom json modules must have ``dumps`` and ``loads``
functions that are compatible with the standard library
versions.
versions. This is a process-wide setting, all instantiated
servers and clients must use the same JSON module.
:param handle_sigint: Set to ``True`` to automatically handle disconnection
when the process is interrupted, or to ``False`` to
leave interrupt handling to the calling application.

20
src/socketio/kafka_manager.py

@ -5,7 +5,6 @@ try:
except ImportError:
kafka = None
from engineio import json
from .pubsub_manager import PubSubManager
logger = logging.getLogger('socketio')
@ -32,18 +31,29 @@ class KafkaManager(PubSubManager): # pragma: no cover
servers.
:param write_only: If set to ``True``, only initialize to emit events. The
default of ``False`` initializes the class for emitting
and receiving.
and receiving. A write-only instance can be used
independently of the server to emit to clients from an
external process.
:param logger: a custom logger to log it. If not given, the server logger
is used.
:param json: An alternative JSON module to use for encoding and decoding
packets. Custom json modules must have ``dumps`` and ``loads``
functions that are compatible with the standard library
versions. This setting is only used when ``write_only`` is set
to ``True``. Otherwise the JSON module configured in the
server is used.
"""
name = 'kafka'
def __init__(self, url='kafka://localhost:9092', channel='socketio',
write_only=False):
write_only=False, logger=None, json=None):
if kafka is None:
raise RuntimeError('kafka-python package is not installed '
'(Run "pip install kafka-python" in your '
'virtualenv).')
super().__init__(channel=channel, write_only=write_only)
super().__init__(channel=channel, write_only=write_only, logger=logger,
json=json)
urls = [url] if isinstance(url, str) else url
self.kafka_urls = [url[8:] if url != 'kafka://' else 'localhost:9092'
@ -53,7 +63,7 @@ class KafkaManager(PubSubManager): # pragma: no cover
bootstrap_servers=self.kafka_urls)
def _publish(self, data):
self.producer.send(self.channel, value=json.dumps(data))
self.producer.send(self.channel, value=self.json.dumps(data))
self.producer.flush()
def _kafka_listen(self):

20
src/socketio/kombu_manager.py

@ -6,7 +6,6 @@ try:
except ImportError:
kombu = None
from engineio import json
from .pubsub_manager import PubSubManager
@ -34,7 +33,17 @@ class KombuManager(PubSubManager): # pragma: no cover
notifications. Must be the same in all the servers.
:param write_only: If set to ``True``, only initialize to emit events. The
default of ``False`` initializes the class for emitting
and receiving.
and receiving. A write-only instance can be used
independently of the server to emit to clients from an
external process.
:param logger: a custom logger to log it. If not given, the server logger
is used.
:param json: An alternative JSON module to use for encoding and decoding
packets. Custom json modules must have ``dumps`` and ``loads``
functions that are compatible with the standard library
versions. This setting is only used when ``write_only`` is set
to ``True``. Otherwise the JSON module configured in the
server is used.
:param connection_options: additional keyword arguments to be passed to
``kombu.Connection()``.
:param exchange_options: additional keyword arguments to be passed to
@ -47,14 +56,15 @@ class KombuManager(PubSubManager): # pragma: no cover
name = 'kombu'
def __init__(self, url='amqp://guest:guest@localhost:5672//',
channel='socketio', write_only=False, logger=None,
channel='socketio', write_only=False, logger=None, json=None,
connection_options=None, exchange_options=None,
queue_options=None, producer_options=None):
if kombu is None:
raise RuntimeError('Kombu package is not installed '
'(Run "pip install kombu" in your '
'virtualenv).')
super().__init__(channel=channel, write_only=write_only, logger=logger)
super().__init__(channel=channel, write_only=write_only, logger=logger,
json=json)
self.url = url
self.connection_options = connection_options or {}
self.exchange_options = exchange_options or {}
@ -102,7 +112,7 @@ class KombuManager(PubSubManager): # pragma: no cover
try:
producer_publish = self._producer_publish(
self.publisher_connection)
producer_publish(json.dumps(data))
producer_publish(self.json.dumps(data))
break
except (OSError, kombu.exceptions.KombuError):
if retry:

22
src/socketio/pubsub_manager.py

@ -2,8 +2,6 @@ import base64
from functools import partial
import uuid
from engineio import json
from .manager import Manager
from .packet import Packet
@ -21,15 +19,31 @@ class PubSubManager(Manager):
:param channel: The channel name on which the server sends and receives
notifications.
:param write_only: If set to ``True``, only initialize to emit events. The
default of ``False`` initializes the class for emitting
and receiving. A write-only instance can be used
independently of the server to emit to clients from an
external process.
:param logger: a custom logger to log it. If not given, the server logger
is used.
:param json: An alternative JSON module to use for encoding and decoding
packets. Custom json modules must have ``dumps`` and ``loads``
functions that are compatible with the standard library
versions. This setting is only used when ``write_only`` is set
to ``True``. Otherwise the JSON module configured in the
server is used.
"""
name = 'pubsub'
def __init__(self, channel='socketio', write_only=False, logger=None):
def __init__(self, channel='socketio', write_only=False, logger=None,
json=None):
super().__init__()
self.channel = channel
self.write_only = write_only
self.host_id = uuid.uuid4().hex
self.logger = logger
if json is not None:
self.json = json
def initialize(self):
super().initialize()
@ -215,7 +229,7 @@ class PubSubManager(Manager):
data = message
else:
try:
data = json.loads(message)
data = self.json.loads(message)
except:
pass
if data and 'method' in data:

20
src/socketio/redis_manager.py

@ -16,7 +16,6 @@ except ImportError: # pragma: no cover
valkey = None
ValkeyError = None
from engineio import json
from .pubsub_manager import PubSubManager
logger = logging.getLogger('socketio')
@ -72,15 +71,26 @@ class RedisManager(PubSubManager):
notifications. Must be the same in all the servers.
:param write_only: If set to ``True``, only initialize to emit events. The
default of ``False`` initializes the class for emitting
and receiving.
and receiving. A write-only instance can be used
independently of the server to emit to clients from an
external process.
:param logger: a custom logger to log it. If not given, the server logger
is used.
:param json: An alternative JSON module to use for encoding and decoding
packets. Custom json modules must have ``dumps`` and ``loads``
functions that are compatible with the standard library
versions. This setting is only used when ``write_only`` is set
to ``True``. Otherwise the JSON module configured in the
server is used.
:param redis_options: additional keyword arguments to be passed to
``Redis.from_url()`` or ``Sentinel()``.
"""
name = 'redis'
def __init__(self, url='redis://localhost:6379/0', channel='socketio',
write_only=False, logger=None, redis_options=None):
super().__init__(channel=channel, write_only=write_only, logger=logger)
write_only=False, logger=None, json=None, redis_options=None):
super().__init__(channel=channel, write_only=write_only, logger=logger,
json=json)
self.redis_url = url
self.redis_options = redis_options or {}
self.connected = False
@ -153,7 +163,7 @@ class RedisManager(PubSubManager):
try:
if not self.connected:
self._redis_connect()
return self.redis.publish(self.channel, json.dumps(data))
return self.redis.publish(self.channel, self.json.dumps(data))
except error as exc:
if retries_left > 0:
logger.error(

5
src/socketio/server.py

@ -30,10 +30,11 @@ class Server(base_server.BaseServer):
of the ``encode()`` and ``decode()`` methods can be
provided. Client and server must use compatible
serializers.
:param json: An alternative json module to use for encoding and decoding
:param json: An alternative JSON module to use for encoding and decoding
packets. Custom json modules must have ``dumps`` and ``loads``
functions that are compatible with the standard library
versions.
versions. This is a process-wide setting, all instantiated
servers and clients must use the same JSON module.
:param async_handlers: If set to ``True``, event handlers for a client are
executed in separate threads. To run handlers for a
client synchronously, set to ``False``. The default

26
src/socketio/zmq_manager.py

@ -1,6 +1,5 @@
import re
from engineio import json
from .pubsub_manager import PubSubManager
@ -23,7 +22,17 @@ class ZmqManager(PubSubManager): # pragma: no cover
notifications. Must be the same in all the servers.
:param write_only: If set to ``True``, only initialize to emit events. The
default of ``False`` initializes the class for emitting
and receiving.
and receiving. A write-only instance can be used
independently of the server to emit to clients from an
external process.
:param logger: a custom logger to log it. If not given, the server logger
is used.
:param json: An alternative JSON module to use for encoding and decoding
packets. Custom json modules must have ``dumps`` and ``loads``
functions that are compatible with the standard library
versions. This setting is only used when ``write_only`` is set
to ``True``. Otherwise the JSON module configured in the
server is used.
A zmq message broker must be running for the zmq_manager to work.
you can write your own or adapt one from the following simple broker
@ -42,10 +51,8 @@ class ZmqManager(PubSubManager): # pragma: no cover
"""
name = 'zmq'
def __init__(self, url='zmq+tcp://localhost:5555+5556',
channel='socketio',
write_only=False,
logger=None):
def __init__(self, url='zmq+tcp://localhost:5555+5556', channel='socketio',
write_only=False, logger=None, json=None):
try:
from eventlet.green import zmq
except ImportError:
@ -57,7 +64,8 @@ class ZmqManager(PubSubManager): # pragma: no cover
if not (url.startswith('zmq+tcp://') and r.search(url)):
raise RuntimeError('unexpected connection string: ' + url)
super().__init__(channel=channel, write_only=write_only, logger=logger)
super().__init__(channel=channel, write_only=write_only, logger=logger,
json=json)
url = url.replace('zmq+', '')
(sink_url, sub_port) = url.split('+')
sink_port = sink_url.split(':')[-1]
@ -75,7 +83,7 @@ class ZmqManager(PubSubManager): # pragma: no cover
self.channel = channel
def _publish(self, data):
packed_data = json.dumps(
packed_data = self.json.dumps(
{
'type': 'message',
'channel': self.channel,
@ -94,7 +102,7 @@ class ZmqManager(PubSubManager): # pragma: no cover
for message in self.zmq_listen():
if isinstance(message, bytes):
try:
message = json.loads(message)
message = self.json.loads(message)
except Exception:
pass
if isinstance(message, dict) and \

13
tests/async/test_pubsub_manager.py

@ -5,8 +5,10 @@ from unittest import mock
import pytest
from engineio.packet import Packet as EIOPacket
from socketio import async_manager
from socketio import async_pubsub_manager
from socketio import async_server
from socketio import packet
@ -846,3 +848,14 @@ class TestAsyncPubSubManager:
self.pm._handle_emit.assert_awaited_with(
{'method': 'emit', 'value': 'bar', 'host_id': 'x'}
)
def test_custom_json(self):
saved_json = packet.Packet.json
cm = async_pubsub_manager.AsyncPubSubManager(json='foo')
assert cm.json == 'foo'
async_server.AsyncServer(json='bar', client_manager=cm)
assert cm.json == 'bar'
packet.Packet.json = saved_json
EIOPacket.json = saved_json

15
tests/async/test_redis_manager.py

@ -2,8 +2,10 @@ import pytest
import redis
import valkey
from socketio import async_redis_manager
from engineio.packet import Packet as EIOPacket
from socketio import async_redis_manager, AsyncServer
from socketio.async_redis_manager import AsyncRedisManager
from socketio.packet import Packet
class TestAsyncRedisManager:
@ -109,3 +111,14 @@ class TestAsyncRedisManager:
assert isinstance(c.redis, valkey.asyncio.Valkey)
async_redis_manager.aioredis = saved_redis
def test_custom_json(self):
saved_json = Packet.json
cm = AsyncRedisManager('redis://', json='foo')
assert cm.json == 'foo'
AsyncServer(json='bar', client_manager=cm)
assert cm.json == 'bar'
Packet.json = saved_json
EIOPacket.json = saved_json

13
tests/common/test_pubsub_manager.py

@ -5,9 +5,11 @@ from unittest import mock
import pytest
from engineio.packet import Packet as EIOPacket
from socketio import manager
from socketio import pubsub_manager
from socketio import packet
from socketio import server
class TestPubSubManager:
@ -822,3 +824,14 @@ class TestPubSubManager:
self.pm._handle_emit.assert_called_with(
{'method': 'emit', 'value': 'bar', 'host_id': 'x'}
)
def test_custom_json(self):
saved_json = packet.Packet.json
cm = pubsub_manager.PubSubManager(json='foo')
assert cm.json == 'foo'
server.Server(json='bar', client_manager=cm)
assert cm.json == 'bar'
packet.Packet.json = saved_json
EIOPacket.json = saved_json

15
tests/common/test_redis_manager.py

@ -2,7 +2,9 @@ import pytest
import redis
import valkey
from socketio import redis_manager
from engineio.packet import Packet as EIOPacket
from socketio import redis_manager, Server
from socketio.packet import Packet
from socketio.redis_manager import RedisManager, parse_redis_sentinel_url
@ -144,3 +146,14 @@ class TestPubSubManager:
'myredis',
{'username': 'user', 'password': 'password', 'db': 0}
)
def test_custom_json(self):
saved_json = Packet.json
cm = RedisManager('redis://', json='foo')
assert cm.json == 'foo'
Server(json='bar', client_manager=cm)
assert cm.json == 'bar'
Packet.json = saved_json
EIOPacket.json = saved_json

2
tox.ini

@ -1,7 +1,7 @@
[testenv]
commands=
pip install -e .
pytest -p no:logging --timeout=60 --cov=socketio --cov-branch --cov-report=term-missing --cov-report=xml
pytest -p no:logging --timeout=60 --cov=socketio --cov-branch --cov-report=term-missing --cov-report=xml {posargs}
deps=
simple-websocket
uvicorn

Loading…
Cancel
Save