Browse Source

Replace pickle with json

pull/1502/head
Miguel Grinberg 9 months ago
parent
commit
333349c14a
Failed to extract signature
  1. 6
      src/socketio/async_aiopika_manager.py
  2. 15
      src/socketio/async_pubsub_manager.py
  3. 4
      src/socketio/async_redis_manager.py
  4. 6
      src/socketio/kafka_manager.py
  5. 4
      src/socketio/kombu_manager.py
  6. 15
      src/socketio/pubsub_manager.py
  7. 4
      src/socketio/redis_manager.py
  8. 10
      src/socketio/zmq_manager.py
  9. 13
      tests/async/test_pubsub_manager.py
  10. 13
      tests/common/test_pubsub_manager.py

6
src/socketio/async_aiopika_manager.py

@ -1,6 +1,6 @@
import asyncio import asyncio
import pickle
from engineio import json
from .async_pubsub_manager import AsyncPubSubManager from .async_pubsub_manager import AsyncPubSubManager
try: try:
@ -82,7 +82,7 @@ class AsyncAioPikaManager(AsyncPubSubManager): # pragma: no cover
try: try:
await self.publisher_exchange.publish( await self.publisher_exchange.publish(
aio_pika.Message( aio_pika.Message(
body=pickle.dumps(data), body=json.dumps(data),
delivery_mode=aio_pika.DeliveryMode.PERSISTENT delivery_mode=aio_pika.DeliveryMode.PERSISTENT
), routing_key='*', ), routing_key='*',
) )
@ -113,7 +113,7 @@ class AsyncAioPikaManager(AsyncPubSubManager): # pragma: no cover
async with queue.iterator() as queue_iter: async with queue.iterator() as queue_iter:
async for message in queue_iter: async for message in queue_iter:
async with message.process(): async with message.process():
yield pickle.loads(message.body) yield message.body
retry_sleep = 1 retry_sleep = 1
except aio_pika.AMQPException: except aio_pika.AMQPException:
self._get_logger().error( self._get_logger().error(

15
src/socketio/async_pubsub_manager.py

@ -3,7 +3,6 @@ from functools import partial
import uuid import uuid
from engineio import json from engineio import json
import pickle
from .async_manager import AsyncManager from .async_manager import AsyncManager
@ -202,16 +201,10 @@ class AsyncPubSubManager(AsyncManager):
if isinstance(message, dict): if isinstance(message, dict):
data = message data = message
else: else:
if isinstance(message, bytes): # pragma: no cover try:
try: data = json.loads(message)
data = pickle.loads(message) except:
except: pass
pass
if data is None:
try:
data = json.loads(message)
except:
pass
if data and 'method' in data: if data and 'method' in data:
self._get_logger().debug('pubsub message: {}'.format( self._get_logger().debug('pubsub message: {}'.format(
data['method'])) data['method']))

4
src/socketio/async_redis_manager.py

@ -1,5 +1,4 @@
import asyncio import asyncio
import pickle
from urllib.parse import urlparse from urllib.parse import urlparse
try: # pragma: no cover try: # pragma: no cover
@ -20,6 +19,7 @@ except ImportError: # pragma: no cover
valkey = None valkey = None
ValkeyError = None ValkeyError = None
from engineio import json
from .async_pubsub_manager import AsyncPubSubManager from .async_pubsub_manager import AsyncPubSubManager
from .redis_manager import parse_redis_sentinel_url from .redis_manager import parse_redis_sentinel_url
@ -108,7 +108,7 @@ class AsyncRedisManager(AsyncPubSubManager): # pragma: no cover
if not retry: if not retry:
self._redis_connect() self._redis_connect()
return await self.redis.publish( return await self.redis.publish(
self.channel, pickle.dumps(data)) self.channel, json.dumps(data))
except error as exc: except error as exc:
if retry: if retry:
self._get_logger().error( self._get_logger().error(

6
src/socketio/kafka_manager.py

@ -1,11 +1,11 @@
import logging import logging
import pickle
try: try:
import kafka import kafka
except ImportError: except ImportError:
kafka = None kafka = None
from engineio import json
from .pubsub_manager import PubSubManager from .pubsub_manager import PubSubManager
logger = logging.getLogger('socketio') logger = logging.getLogger('socketio')
@ -53,7 +53,7 @@ class KafkaManager(PubSubManager): # pragma: no cover
bootstrap_servers=self.kafka_urls) bootstrap_servers=self.kafka_urls)
def _publish(self, data): def _publish(self, data):
self.producer.send(self.channel, value=pickle.dumps(data)) self.producer.send(self.channel, value=json.dumps(data))
self.producer.flush() self.producer.flush()
def _kafka_listen(self): def _kafka_listen(self):
@ -62,4 +62,4 @@ class KafkaManager(PubSubManager): # pragma: no cover
def _listen(self): def _listen(self):
for message in self._kafka_listen(): for message in self._kafka_listen():
if message.topic == self.channel: if message.topic == self.channel:
yield pickle.loads(message.value) yield message.value

4
src/socketio/kombu_manager.py

@ -1,4 +1,3 @@
import pickle
import time import time
import uuid import uuid
@ -7,6 +6,7 @@ try:
except ImportError: except ImportError:
kombu = None kombu = None
from engineio import json
from .pubsub_manager import PubSubManager from .pubsub_manager import PubSubManager
@ -102,7 +102,7 @@ class KombuManager(PubSubManager): # pragma: no cover
try: try:
producer_publish = self._producer_publish( producer_publish = self._producer_publish(
self.publisher_connection) self.publisher_connection)
producer_publish(pickle.dumps(data)) producer_publish(json.dumps(data))
break break
except (OSError, kombu.exceptions.KombuError): except (OSError, kombu.exceptions.KombuError):
if retry: if retry:

15
src/socketio/pubsub_manager.py

@ -2,7 +2,6 @@ from functools import partial
import uuid import uuid
from engineio import json from engineio import json
import pickle
from .manager import Manager from .manager import Manager
@ -196,16 +195,10 @@ class PubSubManager(Manager):
if isinstance(message, dict): if isinstance(message, dict):
data = message data = message
else: else:
if isinstance(message, bytes): # pragma: no cover try:
try: data = json.loads(message)
data = pickle.loads(message) except:
except: pass
pass
if data is None:
try:
data = json.loads(message)
except:
pass
if data and 'method' in data: if data and 'method' in data:
self._get_logger().debug('pubsub message: {}'.format( self._get_logger().debug('pubsub message: {}'.format(
data['method'])) data['method']))

4
src/socketio/redis_manager.py

@ -1,5 +1,4 @@
import logging import logging
import pickle
import time import time
from urllib.parse import urlparse from urllib.parse import urlparse
@ -17,6 +16,7 @@ except ImportError:
valkey = None valkey = None
ValkeyError = None ValkeyError = None
from engineio import json
from .pubsub_manager import PubSubManager from .pubsub_manager import PubSubManager
logger = logging.getLogger('socketio') logger = logging.getLogger('socketio')
@ -145,7 +145,7 @@ class RedisManager(PubSubManager): # pragma: no cover
try: try:
if not retry: if not retry:
self._redis_connect() self._redis_connect()
return self.redis.publish(self.channel, pickle.dumps(data)) return self.redis.publish(self.channel, json.dumps(data))
except error as exc: except error as exc:
if retry: if retry:
logger.error( logger.error(

10
src/socketio/zmq_manager.py

@ -1,6 +1,6 @@
import pickle
import re import re
from engineio import json
from .pubsub_manager import PubSubManager from .pubsub_manager import PubSubManager
@ -75,14 +75,14 @@ class ZmqManager(PubSubManager): # pragma: no cover
self.channel = channel self.channel = channel
def _publish(self, data): def _publish(self, data):
pickled_data = pickle.dumps( packed_data = json.dumps(
{ {
'type': 'message', 'type': 'message',
'channel': self.channel, 'channel': self.channel,
'data': data 'data': data
} }
) ).encode()
return self.sink.send(pickled_data) return self.sink.send(packed_data)
def zmq_listen(self): def zmq_listen(self):
while True: while True:
@ -94,7 +94,7 @@ class ZmqManager(PubSubManager): # pragma: no cover
for message in self.zmq_listen(): for message in self.zmq_listen():
if isinstance(message, bytes): if isinstance(message, bytes):
try: try:
message = pickle.loads(message) message = json.loads(message)
except Exception: except Exception:
pass pass
if isinstance(message, dict) and \ if isinstance(message, dict) and \

13
tests/async/test_pubsub_manager.py

@ -1,5 +1,6 @@
import asyncio import asyncio
import functools import functools
import json
from unittest import mock from unittest import mock
import pytest import pytest
@ -482,22 +483,20 @@ class TestAsyncPubSubManager:
host_id = self.pm.host_id host_id = self.pm.host_id
async def messages(): async def messages():
import pickle
yield {'method': 'emit', 'value': 'foo', 'host_id': 'x'} yield {'method': 'emit', 'value': 'foo', 'host_id': 'x'}
yield {'missing': 'method', 'host_id': 'x'} yield {'missing': 'method', 'host_id': 'x'}
yield '{"method": "callback", "value": "bar", "host_id": "x"}' yield '{"method": "callback", "value": "bar", "host_id": "x"}'
yield {'method': 'disconnect', 'sid': '123', 'namespace': '/foo', yield {'method': 'disconnect', 'sid': '123', 'namespace': '/foo',
'host_id': 'x'} 'host_id': 'x'}
yield {'method': 'bogus', 'host_id': 'x'} yield {'method': 'bogus', 'host_id': 'x'}
yield pickle.dumps({'method': 'close_room', 'value': 'baz', yield json.dumps({'method': 'close_room', 'value': 'baz',
'host_id': 'x'}) 'host_id': 'x'})
yield {'method': 'enter_room', 'sid': '123', 'namespace': '/foo', yield {'method': 'enter_room', 'sid': '123', 'namespace': '/foo',
'room': 'room', 'host_id': 'x'} 'room': 'room', 'host_id': 'x'}
yield {'method': 'leave_room', 'sid': '123', 'namespace': '/foo', yield {'method': 'leave_room', 'sid': '123', 'namespace': '/foo',
'room': 'room', 'host_id': 'x'} 'room': 'room', 'host_id': 'x'}
yield 'bad json' yield 'bad json'
yield b'bad pickled' yield b'bad data'
# these should not publish anything on the queue, as they come from # these should not publish anything on the queue, as they come from
# the same host # the same host
@ -505,8 +504,8 @@ class TestAsyncPubSubManager:
yield {'method': 'callback', 'value': 'bar', 'host_id': host_id} yield {'method': 'callback', 'value': 'bar', 'host_id': host_id}
yield {'method': 'disconnect', 'sid': '123', 'namespace': '/foo', yield {'method': 'disconnect', 'sid': '123', 'namespace': '/foo',
'host_id': host_id} 'host_id': host_id}
yield pickle.dumps({'method': 'close_room', 'value': 'baz', yield json.dumps({'method': 'close_room', 'value': 'baz',
'host_id': host_id}) 'host_id': host_id})
self.pm._listen = messages self.pm._listen = messages
await self.pm._thread() await self.pm._thread()

13
tests/common/test_pubsub_manager.py

@ -1,4 +1,5 @@
import functools import functools
import json
import logging import logging
from unittest import mock from unittest import mock
@ -465,22 +466,20 @@ class TestPubSubManager:
host_id = self.pm.host_id host_id = self.pm.host_id
def messages(): def messages():
import pickle
yield {'method': 'emit', 'value': 'foo', 'host_id': 'x'} yield {'method': 'emit', 'value': 'foo', 'host_id': 'x'}
yield {'missing': 'method', 'host_id': 'x'} yield {'missing': 'method', 'host_id': 'x'}
yield '{"method": "callback", "value": "bar", "host_id": "x"}' yield '{"method": "callback", "value": "bar", "host_id": "x"}'
yield {'method': 'disconnect', 'sid': '123', 'namespace': '/foo', yield {'method': 'disconnect', 'sid': '123', 'namespace': '/foo',
'host_id': 'x'} 'host_id': 'x'}
yield {'method': 'bogus', 'host_id': 'x'} yield {'method': 'bogus', 'host_id': 'x'}
yield pickle.dumps({'method': 'close_room', 'value': 'baz', yield json.dumps({'method': 'close_room', 'value': 'baz',
'host_id': 'x'}) 'host_id': 'x'})
yield {'method': 'enter_room', 'sid': '123', 'namespace': '/foo', yield {'method': 'enter_room', 'sid': '123', 'namespace': '/foo',
'room': 'room', 'host_id': 'x'} 'room': 'room', 'host_id': 'x'}
yield {'method': 'leave_room', 'sid': '123', 'namespace': '/foo', yield {'method': 'leave_room', 'sid': '123', 'namespace': '/foo',
'room': 'room', 'host_id': 'x'} 'room': 'room', 'host_id': 'x'}
yield 'bad json' yield 'bad json'
yield b'bad pickled' yield b'bad data'
# these should not publish anything on the queue, as they come from # these should not publish anything on the queue, as they come from
# the same host # the same host
@ -488,8 +487,8 @@ class TestPubSubManager:
yield {'method': 'callback', 'value': 'bar', 'host_id': host_id} yield {'method': 'callback', 'value': 'bar', 'host_id': host_id}
yield {'method': 'disconnect', 'sid': '123', 'namespace': '/foo', yield {'method': 'disconnect', 'sid': '123', 'namespace': '/foo',
'host_id': host_id} 'host_id': host_id}
yield pickle.dumps({'method': 'close_room', 'value': 'baz', yield json.dumps({'method': 'close_room', 'value': 'baz',
'host_id': host_id}) 'host_id': host_id})
self.pm._listen = mock.MagicMock(side_effect=messages) self.pm._listen = mock.MagicMock(side_effect=messages)
try: try:

Loading…
Cancel
Save