Browse Source

Replace pickle with json (#1502)

pull/1505/head
Miguel Grinberg 2 weeks ago
committed by GitHub
parent
commit
53f6be0942
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 22
      docs/server.rst
  2. 6
      src/socketio/async_aiopika_manager.py
  3. 15
      src/socketio/async_pubsub_manager.py
  4. 4
      src/socketio/async_redis_manager.py
  5. 6
      src/socketio/kafka_manager.py
  6. 4
      src/socketio/kombu_manager.py
  7. 15
      src/socketio/pubsub_manager.py
  8. 4
      src/socketio/redis_manager.py
  9. 10
      src/socketio/zmq_manager.py
  10. 13
      tests/async/test_pubsub_manager.py
  11. 13
      tests/common/test_pubsub_manager.py

22
docs/server.rst

@ -1096,17 +1096,17 @@ For a production deployment there are a few recommendations to keep your
application secure.
First of all, the message queue should never be listening on a public network
interface, to ensure that external clients never connect to it. The use of a
private network (VPC), where the communication between servers can happen
privately is highly recommended.
In addition, all message queues support authentication and encryption.
Authentication ensures that only the Socket.IO servers and related processes
have access, while encryption prevents data to be collected by a third-party
listening on the network.
Access credentials can be included in the connection URLs that are passed to the
client managers.
interface, to ensure that external clients never connect to it. For a single
node deployment, the queue should only listen on `localhost`. For a multi-node
system the use of a private network (VPC), where the communication between
servers can happen privately is highly recommended.
In addition, all message queues support authentication and encryption, which
can strenthen the security of the deployment. Authentication ensures that only
the Socket.IO servers and related processes have access, while encryption
prevents data from being collected by a third-party that is listening on the
network. Access credentials can be included in the connection URLs that are
passed to the client managers.
Horizontal Scaling
~~~~~~~~~~~~~~~~~~

6
src/socketio/async_aiopika_manager.py

@ -1,6 +1,6 @@
import asyncio
import pickle
from engineio import json
from .async_pubsub_manager import AsyncPubSubManager
try:
@ -82,7 +82,7 @@ class AsyncAioPikaManager(AsyncPubSubManager): # pragma: no cover
try:
await self.publisher_exchange.publish(
aio_pika.Message(
body=pickle.dumps(data),
body=json.dumps(data),
delivery_mode=aio_pika.DeliveryMode.PERSISTENT
), routing_key='*',
)
@ -113,7 +113,7 @@ class AsyncAioPikaManager(AsyncPubSubManager): # pragma: no cover
async with queue.iterator() as queue_iter:
async for message in queue_iter:
async with message.process():
yield pickle.loads(message.body)
yield message.body
retry_sleep = 1
except aio_pika.AMQPException:
self._get_logger().error(

15
src/socketio/async_pubsub_manager.py

@ -3,7 +3,6 @@ from functools import partial
import uuid
from engineio import json
import pickle
from .async_manager import AsyncManager
@ -202,16 +201,10 @@ class AsyncPubSubManager(AsyncManager):
if isinstance(message, dict):
data = message
else:
if isinstance(message, bytes): # pragma: no cover
try:
data = pickle.loads(message)
except:
pass
if data is None:
try:
data = json.loads(message)
except:
pass
try:
data = json.loads(message)
except:
pass
if data and 'method' in data:
self._get_logger().debug('pubsub message: {}'.format(
data['method']))

4
src/socketio/async_redis_manager.py

@ -1,5 +1,4 @@
import asyncio
import pickle
from urllib.parse import urlparse
try: # pragma: no cover
@ -20,6 +19,7 @@ except ImportError: # pragma: no cover
valkey = None
ValkeyError = None
from engineio import json
from .async_pubsub_manager import AsyncPubSubManager
from .redis_manager import parse_redis_sentinel_url
@ -108,7 +108,7 @@ class AsyncRedisManager(AsyncPubSubManager): # pragma: no cover
if not retry:
self._redis_connect()
return await self.redis.publish(
self.channel, pickle.dumps(data))
self.channel, json.dumps(data))
except error as exc:
if retry:
self._get_logger().error(

6
src/socketio/kafka_manager.py

@ -1,11 +1,11 @@
import logging
import pickle
try:
import kafka
except ImportError:
kafka = None
from engineio import json
from .pubsub_manager import PubSubManager
logger = logging.getLogger('socketio')
@ -53,7 +53,7 @@ class KafkaManager(PubSubManager): # pragma: no cover
bootstrap_servers=self.kafka_urls)
def _publish(self, data):
self.producer.send(self.channel, value=pickle.dumps(data))
self.producer.send(self.channel, value=json.dumps(data))
self.producer.flush()
def _kafka_listen(self):
@ -62,4 +62,4 @@ class KafkaManager(PubSubManager): # pragma: no cover
def _listen(self):
for message in self._kafka_listen():
if message.topic == self.channel:
yield pickle.loads(message.value)
yield message.value

4
src/socketio/kombu_manager.py

@ -1,4 +1,3 @@
import pickle
import time
import uuid
@ -7,6 +6,7 @@ try:
except ImportError:
kombu = None
from engineio import json
from .pubsub_manager import PubSubManager
@ -102,7 +102,7 @@ class KombuManager(PubSubManager): # pragma: no cover
try:
producer_publish = self._producer_publish(
self.publisher_connection)
producer_publish(pickle.dumps(data))
producer_publish(json.dumps(data))
break
except (OSError, kombu.exceptions.KombuError):
if retry:

15
src/socketio/pubsub_manager.py

@ -2,7 +2,6 @@ from functools import partial
import uuid
from engineio import json
import pickle
from .manager import Manager
@ -196,16 +195,10 @@ class PubSubManager(Manager):
if isinstance(message, dict):
data = message
else:
if isinstance(message, bytes): # pragma: no cover
try:
data = pickle.loads(message)
except:
pass
if data is None:
try:
data = json.loads(message)
except:
pass
try:
data = json.loads(message)
except:
pass
if data and 'method' in data:
self._get_logger().debug('pubsub message: {}'.format(
data['method']))

4
src/socketio/redis_manager.py

@ -1,5 +1,4 @@
import logging
import pickle
import time
from urllib.parse import urlparse
@ -17,6 +16,7 @@ except ImportError:
valkey = None
ValkeyError = None
from engineio import json
from .pubsub_manager import PubSubManager
logger = logging.getLogger('socketio')
@ -145,7 +145,7 @@ class RedisManager(PubSubManager): # pragma: no cover
try:
if not retry:
self._redis_connect()
return self.redis.publish(self.channel, pickle.dumps(data))
return self.redis.publish(self.channel, json.dumps(data))
except error as exc:
if retry:
logger.error(

10
src/socketio/zmq_manager.py

@ -1,6 +1,6 @@
import pickle
import re
from engineio import json
from .pubsub_manager import PubSubManager
@ -75,14 +75,14 @@ class ZmqManager(PubSubManager): # pragma: no cover
self.channel = channel
def _publish(self, data):
pickled_data = pickle.dumps(
packed_data = json.dumps(
{
'type': 'message',
'channel': self.channel,
'data': data
}
)
return self.sink.send(pickled_data)
).encode()
return self.sink.send(packed_data)
def zmq_listen(self):
while True:
@ -94,7 +94,7 @@ class ZmqManager(PubSubManager): # pragma: no cover
for message in self.zmq_listen():
if isinstance(message, bytes):
try:
message = pickle.loads(message)
message = json.loads(message)
except Exception:
pass
if isinstance(message, dict) and \

13
tests/async/test_pubsub_manager.py

@ -1,5 +1,6 @@
import asyncio
import functools
import json
from unittest import mock
import pytest
@ -482,22 +483,20 @@ class TestAsyncPubSubManager:
host_id = self.pm.host_id
async def messages():
import pickle
yield {'method': 'emit', 'value': 'foo', 'host_id': 'x'}
yield {'missing': 'method', 'host_id': 'x'}
yield '{"method": "callback", "value": "bar", "host_id": "x"}'
yield {'method': 'disconnect', 'sid': '123', 'namespace': '/foo',
'host_id': 'x'}
yield {'method': 'bogus', 'host_id': 'x'}
yield pickle.dumps({'method': 'close_room', 'value': 'baz',
'host_id': 'x'})
yield json.dumps({'method': 'close_room', 'value': 'baz',
'host_id': 'x'})
yield {'method': 'enter_room', 'sid': '123', 'namespace': '/foo',
'room': 'room', 'host_id': 'x'}
yield {'method': 'leave_room', 'sid': '123', 'namespace': '/foo',
'room': 'room', 'host_id': 'x'}
yield 'bad json'
yield b'bad pickled'
yield b'bad data'
# these should not publish anything on the queue, as they come from
# the same host
@ -505,8 +504,8 @@ class TestAsyncPubSubManager:
yield {'method': 'callback', 'value': 'bar', 'host_id': host_id}
yield {'method': 'disconnect', 'sid': '123', 'namespace': '/foo',
'host_id': host_id}
yield pickle.dumps({'method': 'close_room', 'value': 'baz',
'host_id': host_id})
yield json.dumps({'method': 'close_room', 'value': 'baz',
'host_id': host_id})
self.pm._listen = messages
await self.pm._thread()

13
tests/common/test_pubsub_manager.py

@ -1,4 +1,5 @@
import functools
import json
import logging
from unittest import mock
@ -465,22 +466,20 @@ class TestPubSubManager:
host_id = self.pm.host_id
def messages():
import pickle
yield {'method': 'emit', 'value': 'foo', 'host_id': 'x'}
yield {'missing': 'method', 'host_id': 'x'}
yield '{"method": "callback", "value": "bar", "host_id": "x"}'
yield {'method': 'disconnect', 'sid': '123', 'namespace': '/foo',
'host_id': 'x'}
yield {'method': 'bogus', 'host_id': 'x'}
yield pickle.dumps({'method': 'close_room', 'value': 'baz',
'host_id': 'x'})
yield json.dumps({'method': 'close_room', 'value': 'baz',
'host_id': 'x'})
yield {'method': 'enter_room', 'sid': '123', 'namespace': '/foo',
'room': 'room', 'host_id': 'x'}
yield {'method': 'leave_room', 'sid': '123', 'namespace': '/foo',
'room': 'room', 'host_id': 'x'}
yield 'bad json'
yield b'bad pickled'
yield b'bad data'
# these should not publish anything on the queue, as they come from
# the same host
@ -488,8 +487,8 @@ class TestPubSubManager:
yield {'method': 'callback', 'value': 'bar', 'host_id': host_id}
yield {'method': 'disconnect', 'sid': '123', 'namespace': '/foo',
'host_id': host_id}
yield pickle.dumps({'method': 'close_room', 'value': 'baz',
'host_id': host_id})
yield json.dumps({'method': 'close_room', 'value': 'baz',
'host_id': host_id})
self.pm._listen = mock.MagicMock(side_effect=messages)
try:

Loading…
Cancel
Save