Browse Source

Merge 4239331c0a into fb9648575e

pull/850/merge
Ododo 3 years ago
committed by GitHub
parent
commit
b5b9decc4f
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 7
      src/socketio/asyncio_aiopika_manager.py
  2. 12
      src/socketio/asyncio_pubsub_manager.py
  3. 10
      src/socketio/asyncio_redis_manager.py
  4. 11
      src/socketio/kafka_manager.py
  5. 10
      src/socketio/kombu_manager.py
  6. 12
      src/socketio/pubsub_manager.py
  7. 11
      src/socketio/redis_manager.py
  8. 17
      src/socketio/zmq_manager.py
  9. 48
      tests/common/test_pubsub_manager.py

7
src/socketio/asyncio_aiopika_manager.py

@ -38,7 +38,8 @@ class AsyncAioPikaManager(AsyncPubSubManager): # pragma: no cover
name = 'asyncaiopika' name = 'asyncaiopika'
def __init__(self, url='amqp://guest:guest@localhost:5672//', def __init__(self, url='amqp://guest:guest@localhost:5672//',
channel='socketio', write_only=False, logger=None): channel='socketio', write_only=False, logger=None,
encoder=pickle):
if aio_pika is None: if aio_pika is None:
raise RuntimeError('aio_pika package is not installed ' raise RuntimeError('aio_pika package is not installed '
'(Run "pip install aio_pika" in your ' '(Run "pip install aio_pika" in your '
@ -70,7 +71,7 @@ class AsyncAioPikaManager(AsyncPubSubManager): # pragma: no cover
channel = await self._channel(connection) channel = await self._channel(connection)
exchange = await self._exchange(channel) exchange = await self._exchange(channel)
await exchange.publish( await exchange.publish(
aio_pika.Message(body=pickle.dumps(data), aio_pika.Message(body=self.encoder.dumps(data),
delivery_mode=aio_pika.DeliveryMode.PERSISTENT), delivery_mode=aio_pika.DeliveryMode.PERSISTENT),
routing_key='*' routing_key='*'
) )
@ -94,7 +95,7 @@ class AsyncAioPikaManager(AsyncPubSubManager): # pragma: no cover
async with self.listener_queue.iterator() as queue_iter: async with self.listener_queue.iterator() as queue_iter:
async for message in queue_iter: async for message in queue_iter:
with message.process(): with message.process():
yield pickle.loads(message.body) yield message.body
except Exception: except Exception:
self._get_logger().error('Cannot receive from rabbitmq... ' self._get_logger().error('Cannot receive from rabbitmq... '
'retrying in ' 'retrying in '

12
src/socketio/asyncio_pubsub_manager.py

@ -24,12 +24,14 @@ class AsyncPubSubManager(AsyncManager):
""" """
name = 'asyncpubsub' name = 'asyncpubsub'
def __init__(self, channel='socketio', write_only=False, logger=None): def __init__(self, channel='socketio', write_only=False, logger=None,
encoder=pickle):
super().__init__() super().__init__()
self.channel = channel self.channel = channel
self.write_only = write_only self.write_only = write_only
self.host_id = uuid.uuid4().hex self.host_id = uuid.uuid4().hex
self.logger = logger self.logger = logger
self.encoder = encoder
def initialize(self): def initialize(self):
super().initialize() super().initialize()
@ -153,7 +155,13 @@ class AsyncPubSubManager(AsyncManager):
if isinstance(message, dict): if isinstance(message, dict):
data = message data = message
else: else:
if isinstance(message, bytes): # pragma: no cover if self.encoder:
try:
data = self.encoder.loads(message)
except:
pass
if data is None and \
isinstance(message, bytes): # pragma: no cover
try: try:
data = pickle.loads(message) data = pickle.loads(message)
except: except:

10
src/socketio/asyncio_redis_manager.py

@ -32,11 +32,14 @@ class AsyncRedisManager(AsyncPubSubManager): # pragma: no cover
and receiving. and receiving.
:param redis_options: additional keyword arguments to be passed to :param redis_options: additional keyword arguments to be passed to
``aioredis.from_url()``. ``aioredis.from_url()``.
:param encoder: The encoder to use for publishing and decoding data,
defaults to pickle.
""" """
name = 'aioredis' name = 'aioredis'
def __init__(self, url='redis://localhost:6379/0', channel='socketio', def __init__(self, url='redis://localhost:6379/0', channel='socketio',
write_only=False, logger=None, redis_options=None): write_only=False, logger=None, redis_options=None,
encoder=pickle):
if aioredis is None: if aioredis is None:
raise RuntimeError('Redis package is not installed ' raise RuntimeError('Redis package is not installed '
'(Run "pip install aioredis" in your ' '(Run "pip install aioredis" in your '
@ -46,7 +49,8 @@ class AsyncRedisManager(AsyncPubSubManager): # pragma: no cover
self.redis_url = url self.redis_url = url
self.redis_options = redis_options or {} self.redis_options = redis_options or {}
self._redis_connect() self._redis_connect()
super().__init__(channel=channel, write_only=write_only, logger=logger) super().__init__(channel=channel, write_only=write_only, logger=logger,
encoder=encoder)
def _redis_connect(self): def _redis_connect(self):
self.redis = aioredis.Redis.from_url(self.redis_url, self.redis = aioredis.Redis.from_url(self.redis_url,
@ -60,7 +64,7 @@ class AsyncRedisManager(AsyncPubSubManager): # pragma: no cover
if not retry: if not retry:
self._redis_connect() self._redis_connect()
return await self.redis.publish( return await self.redis.publish(
self.channel, pickle.dumps(data)) self.channel, self.encoder.dumps(data))
except aioredis.exceptions.RedisError: except aioredis.exceptions.RedisError:
if retry: if retry:
self._get_logger().error('Cannot publish to redis... ' self._get_logger().error('Cannot publish to redis... '

11
src/socketio/kafka_manager.py

@ -33,18 +33,21 @@ class KafkaManager(PubSubManager): # pragma: no cover
:param write_only: If set to ``True``, only initialize to emit events. The :param write_only: If set to ``True``, only initialize to emit events. The
default of ``False`` initializes the class for emitting default of ``False`` initializes the class for emitting
and receiving. and receiving.
:param encoder: The encoder to use for publishing and decoding data,
defaults to pickle.
""" """
name = 'kafka' name = 'kafka'
def __init__(self, url='kafka://localhost:9092', channel='socketio', def __init__(self, url='kafka://localhost:9092', channel='socketio',
write_only=False): write_only=False, encoder=pickle):
if kafka is None: if kafka is None:
raise RuntimeError('kafka-python package is not installed ' raise RuntimeError('kafka-python package is not installed '
'(Run "pip install kafka-python" in your ' '(Run "pip install kafka-python" in your '
'virtualenv).') 'virtualenv).')
super(KafkaManager, self).__init__(channel=channel, super(KafkaManager, self).__init__(channel=channel,
write_only=write_only) write_only=write_only,
encoder=encoder)
urls = [url] if isinstance(url, str) else url urls = [url] if isinstance(url, str) else url
self.kafka_urls = [url[8:] if url != 'kafka://' else 'localhost:9092' self.kafka_urls = [url[8:] if url != 'kafka://' else 'localhost:9092'
@ -54,7 +57,7 @@ class KafkaManager(PubSubManager): # pragma: no cover
bootstrap_servers=self.kafka_urls) bootstrap_servers=self.kafka_urls)
def _publish(self, data): def _publish(self, data):
self.producer.send(self.channel, value=pickle.dumps(data)) self.producer.send(self.channel, value=self.encoder.dumps(data))
self.producer.flush() self.producer.flush()
def _kafka_listen(self): def _kafka_listen(self):
@ -64,4 +67,4 @@ class KafkaManager(PubSubManager): # pragma: no cover
def _listen(self): def _listen(self):
for message in self._kafka_listen(): for message in self._kafka_listen():
if message.topic == self.channel: if message.topic == self.channel:
yield pickle.loads(message.value) yield message.value

10
src/socketio/kombu_manager.py

@ -42,20 +42,24 @@ class KombuManager(PubSubManager): # pragma: no cover
``kombu.Queue()``. ``kombu.Queue()``.
:param producer_options: additional keyword arguments to be passed to :param producer_options: additional keyword arguments to be passed to
``kombu.Producer()``. ``kombu.Producer()``.
:param encoder: The encoder to use for publishing and decoding data,
defaults to pickle.
""" """
name = 'kombu' name = 'kombu'
def __init__(self, url='amqp://guest:guest@localhost:5672//', def __init__(self, url='amqp://guest:guest@localhost:5672//',
channel='socketio', write_only=False, logger=None, channel='socketio', write_only=False, logger=None,
connection_options=None, exchange_options=None, connection_options=None, exchange_options=None,
queue_options=None, producer_options=None): queue_options=None, producer_options=None,
encoder=pickle):
if kombu is None: if kombu is None:
raise RuntimeError('Kombu package is not installed ' raise RuntimeError('Kombu package is not installed '
'(Run "pip install kombu" in your ' '(Run "pip install kombu" in your '
'virtualenv).') 'virtualenv).')
super(KombuManager, self).__init__(channel=channel, super(KombuManager, self).__init__(channel=channel,
write_only=write_only, write_only=write_only,
logger=logger) logger=logger,
encoder=encoder)
self.url = url self.url = url
self.connection_options = connection_options or {} self.connection_options = connection_options or {}
self.exchange_options = exchange_options or {} self.exchange_options = exchange_options or {}
@ -103,7 +107,7 @@ class KombuManager(PubSubManager): # pragma: no cover
connection = self._connection() connection = self._connection()
publish = connection.ensure(self.producer, self.producer.publish, publish = connection.ensure(self.producer, self.producer.publish,
errback=self.__error_callback) errback=self.__error_callback)
publish(pickle.dumps(data)) publish(self.encoder.dumps(data))
def _listen(self): def _listen(self):
reader_queue = self._queue() reader_queue = self._queue()

12
src/socketio/pubsub_manager.py

@ -23,12 +23,14 @@ class PubSubManager(BaseManager):
""" """
name = 'pubsub' name = 'pubsub'
def __init__(self, channel='socketio', write_only=False, logger=None): def __init__(self, channel='socketio', write_only=False, logger=None,
encoder=None):
super(PubSubManager, self).__init__() super(PubSubManager, self).__init__()
self.channel = channel self.channel = channel
self.write_only = write_only self.write_only = write_only
self.host_id = uuid.uuid4().hex self.host_id = uuid.uuid4().hex
self.logger = logger self.logger = logger
self.encoder = encoder
def initialize(self): def initialize(self):
super(PubSubManager, self).initialize() super(PubSubManager, self).initialize()
@ -151,7 +153,13 @@ class PubSubManager(BaseManager):
if isinstance(message, dict): if isinstance(message, dict):
data = message data = message
else: else:
if isinstance(message, bytes): # pragma: no cover if self.encoder:
try:
data = self.encoder.loads(message)
except:
pass
if data is None and \
isinstance(message, bytes): # pragma: no cover
try: try:
data = pickle.loads(message) data = pickle.loads(message)
except: except:

11
src/socketio/redis_manager.py

@ -36,11 +36,14 @@ class RedisManager(PubSubManager): # pragma: no cover
and receiving. and receiving.
:param redis_options: additional keyword arguments to be passed to :param redis_options: additional keyword arguments to be passed to
``Redis.from_url()``. ``Redis.from_url()``.
:param encoder: The encoder to use for publishing and decoding data,
defaults to pickle.
""" """
name = 'redis' name = 'redis'
def __init__(self, url='redis://localhost:6379/0', channel='socketio', def __init__(self, url='redis://localhost:6379/0', channel='socketio',
write_only=False, logger=None, redis_options=None): write_only=False, logger=None, redis_options=None,
encoder=pickle):
if redis is None: if redis is None:
raise RuntimeError('Redis package is not installed ' raise RuntimeError('Redis package is not installed '
'(Run "pip install redis" in your ' '(Run "pip install redis" in your '
@ -50,7 +53,8 @@ class RedisManager(PubSubManager): # pragma: no cover
self._redis_connect() self._redis_connect()
super(RedisManager, self).__init__(channel=channel, super(RedisManager, self).__init__(channel=channel,
write_only=write_only, write_only=write_only,
logger=logger) logger=logger,
encoder=encoder)
def initialize(self): def initialize(self):
super(RedisManager, self).initialize() super(RedisManager, self).initialize()
@ -78,7 +82,8 @@ class RedisManager(PubSubManager): # pragma: no cover
try: try:
if not retry: if not retry:
self._redis_connect() self._redis_connect()
return self.redis.publish(self.channel, pickle.dumps(data)) return self.redis.publish(self.channel,
self.encoder.dumps(data))
except redis.exceptions.RedisError: except redis.exceptions.RedisError:
if retry: if retry:
logger.error('Cannot publish to redis... retrying') logger.error('Cannot publish to redis... retrying')

17
src/socketio/zmq_manager.py

@ -29,6 +29,8 @@ class ZmqManager(PubSubManager): # pragma: no cover
:param write_only: If set to ``True``, only initialize to emit events. The :param write_only: If set to ``True``, only initialize to emit events. The
default of ``False`` initializes the class for emitting default of ``False`` initializes the class for emitting
and receiving. and receiving.
:param encoder: The encoder to use for publishing and decoding data,
defaults to pickle.
A zmq message broker must be running for the zmq_manager to work. A zmq message broker must be running for the zmq_manager to work.
you can write your own or adapt one from the following simple broker you can write your own or adapt one from the following simple broker
@ -50,7 +52,8 @@ class ZmqManager(PubSubManager): # pragma: no cover
def __init__(self, url='zmq+tcp://localhost:5555+5556', def __init__(self, url='zmq+tcp://localhost:5555+5556',
channel='socketio', channel='socketio',
write_only=False, write_only=False,
logger=None): logger=None,
encoder=pickle):
if zmq is None: if zmq is None:
raise RuntimeError('zmq package is not installed ' raise RuntimeError('zmq package is not installed '
'(Run "pip install pyzmq" in your ' '(Run "pip install pyzmq" in your '
@ -77,17 +80,18 @@ class ZmqManager(PubSubManager): # pragma: no cover
self.channel = channel self.channel = channel
super(ZmqManager, self).__init__(channel=channel, super(ZmqManager, self).__init__(channel=channel,
write_only=write_only, write_only=write_only,
logger=logger) logger=logger,
encoder=encoder)
def _publish(self, data): def _publish(self, data):
pickled_data = pickle.dumps( encoded_data = self.encoder.dumps(
{ {
'type': 'message', 'type': 'message',
'channel': self.channel, 'channel': self.channel,
'data': data 'data': data
} }
) )
return self.sink.send(pickled_data) return self.sink.send(encoded_data)
def zmq_listen(self): def zmq_listen(self):
while True: while True:
@ -98,10 +102,7 @@ class ZmqManager(PubSubManager): # pragma: no cover
def _listen(self): def _listen(self):
for message in self.zmq_listen(): for message in self.zmq_listen():
if isinstance(message, bytes): if isinstance(message, bytes):
try: yield message
message = pickle.loads(message)
except Exception:
pass
if isinstance(message, dict) and \ if isinstance(message, dict) and \
message['type'] == 'message' and \ message['type'] == 'message' and \
message['channel'] == self.channel and \ message['channel'] == self.channel and \

48
tests/common/test_pubsub_manager.py

@ -1,5 +1,8 @@
import functools import functools
import logging import logging
import pickle
import json
import marshal
import unittest import unittest
from unittest import mock from unittest import mock
@ -365,8 +368,6 @@ class TestPubSubManager(unittest.TestCase):
self.pm._handle_close_room = mock.MagicMock() self.pm._handle_close_room = mock.MagicMock()
def messages(): def messages():
import pickle
yield {'method': 'emit', 'value': 'foo'} yield {'method': 'emit', 'value': 'foo'}
yield {'missing': 'method'} yield {'missing': 'method'}
yield '{"method": "callback", "value": "bar"}' yield '{"method": "callback", "value": "bar"}'
@ -394,3 +395,46 @@ class TestPubSubManager(unittest.TestCase):
self.pm._handle_close_room.assert_called_once_with( self.pm._handle_close_room.assert_called_once_with(
{'method': 'close_room', 'value': 'baz'} {'method': 'close_room', 'value': 'baz'}
) )
def test_background_thread_with_encoder(self):
mock_server = mock.MagicMock()
pm = pubsub_manager.PubSubManager(encoder=marshal)
pm.set_server(mock_server)
pm._publish = mock.MagicMock()
pm._handle_emit = mock.MagicMock()
pm._handle_callback = mock.MagicMock()
pm._handle_disconnect = mock.MagicMock()
pm._handle_close_room = mock.MagicMock()
pm.initialize()
def messages():
yield {'method': 'emit', 'value': 'foo'}
yield marshal.dumps({'method': 'callback', 'value': 'bar'})
yield json.dumps(
{'method': 'disconnect', 'sid': '123', 'namespace': '/foo'}
)
yield pickle.dumps({'method': 'close_room', 'value': 'baz'})
yield {'method': 'bogus'}
yield 'bad json'
yield b'bad encoding'
pm._listen = mock.MagicMock(side_effect=messages)
try:
pm._thread()
except StopIteration:
pass
pm._handle_emit.assert_called_once_with(
{'method': 'emit', 'value': 'foo'}
)
pm._handle_callback.assert_called_once_with(
{'method': 'callback', 'value': 'bar'}
)
pm._handle_disconnect.assert_called_once_with(
{'method': 'disconnect', 'sid': '123', 'namespace': '/foo'}
)
pm._handle_close_room.assert_called_once_with(
{'method': 'close_room', 'value': 'baz'}
)

Loading…
Cancel
Save