Browse Source
channel was not properly initialized in several pubsub client managers (#1476)
main
Eugnee
5 days ago
committed by
GitHub
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with
4 additions and
4 deletions
-
src/socketio/async_aiopika_manager.py
-
src/socketio/async_redis_manager.py
-
src/socketio/redis_manager.py
-
src/socketio/zmq_manager.py
|
@ -43,12 +43,12 @@ class AsyncAioPikaManager(AsyncPubSubManager): # pragma: no cover |
|
|
raise RuntimeError('aio_pika package is not installed ' |
|
|
raise RuntimeError('aio_pika package is not installed ' |
|
|
'(Run "pip install aio_pika" in your ' |
|
|
'(Run "pip install aio_pika" in your ' |
|
|
'virtualenv).') |
|
|
'virtualenv).') |
|
|
|
|
|
super().__init__(channel=channel, write_only=write_only, logger=logger) |
|
|
self.url = url |
|
|
self.url = url |
|
|
self._lock = asyncio.Lock() |
|
|
self._lock = asyncio.Lock() |
|
|
self.publisher_connection = None |
|
|
self.publisher_connection = None |
|
|
self.publisher_channel = None |
|
|
self.publisher_channel = None |
|
|
self.publisher_exchange = None |
|
|
self.publisher_exchange = None |
|
|
super().__init__(channel=channel, write_only=write_only, logger=logger) |
|
|
|
|
|
|
|
|
|
|
|
async def _connection(self): |
|
|
async def _connection(self): |
|
|
return await aio_pika.connect_robust(self.url) |
|
|
return await aio_pika.connect_robust(self.url) |
|
|
|
@ -52,10 +52,10 @@ class AsyncRedisManager(AsyncPubSubManager): # pragma: no cover |
|
|
'(Run "pip install redis" in your virtualenv).') |
|
|
'(Run "pip install redis" in your virtualenv).') |
|
|
if not hasattr(aioredis.Redis, 'from_url'): |
|
|
if not hasattr(aioredis.Redis, 'from_url'): |
|
|
raise RuntimeError('Version 2 of aioredis package is required.') |
|
|
raise RuntimeError('Version 2 of aioredis package is required.') |
|
|
|
|
|
super().__init__(channel=channel, write_only=write_only, logger=logger) |
|
|
self.redis_url = url |
|
|
self.redis_url = url |
|
|
self.redis_options = redis_options or {} |
|
|
self.redis_options = redis_options or {} |
|
|
self._redis_connect() |
|
|
self._redis_connect() |
|
|
super().__init__(channel=channel, write_only=write_only, logger=logger) |
|
|
|
|
|
|
|
|
|
|
|
def _redis_connect(self): |
|
|
def _redis_connect(self): |
|
|
if not self.redis_url.startswith('redis+sentinel://'): |
|
|
if not self.redis_url.startswith('redis+sentinel://'): |
|
|
|
@ -75,10 +75,10 @@ class RedisManager(PubSubManager): # pragma: no cover |
|
|
raise RuntimeError('Redis package is not installed ' |
|
|
raise RuntimeError('Redis package is not installed ' |
|
|
'(Run "pip install redis" in your ' |
|
|
'(Run "pip install redis" in your ' |
|
|
'virtualenv).') |
|
|
'virtualenv).') |
|
|
|
|
|
super().__init__(channel=channel, write_only=write_only, logger=logger) |
|
|
self.redis_url = url |
|
|
self.redis_url = url |
|
|
self.redis_options = redis_options or {} |
|
|
self.redis_options = redis_options or {} |
|
|
self._redis_connect() |
|
|
self._redis_connect() |
|
|
super().__init__(channel=channel, write_only=write_only, logger=logger) |
|
|
|
|
|
|
|
|
|
|
|
def initialize(self): |
|
|
def initialize(self): |
|
|
super().initialize() |
|
|
super().initialize() |
|
|
|
@ -57,6 +57,7 @@ class ZmqManager(PubSubManager): # pragma: no cover |
|
|
if not (url.startswith('zmq+tcp://') and r.search(url)): |
|
|
if not (url.startswith('zmq+tcp://') and r.search(url)): |
|
|
raise RuntimeError('unexpected connection string: ' + url) |
|
|
raise RuntimeError('unexpected connection string: ' + url) |
|
|
|
|
|
|
|
|
|
|
|
super().__init__(channel=channel, write_only=write_only, logger=logger) |
|
|
url = url.replace('zmq+', '') |
|
|
url = url.replace('zmq+', '') |
|
|
(sink_url, sub_port) = url.split('+') |
|
|
(sink_url, sub_port) = url.split('+') |
|
|
sink_port = sink_url.split(':')[-1] |
|
|
sink_port = sink_url.split(':')[-1] |
|
@ -72,7 +73,6 @@ class ZmqManager(PubSubManager): # pragma: no cover |
|
|
self.sink = sink |
|
|
self.sink = sink |
|
|
self.sub = sub |
|
|
self.sub = sub |
|
|
self.channel = channel |
|
|
self.channel = channel |
|
|
super().__init__(channel=channel, write_only=write_only, logger=logger) |
|
|
|
|
|
|
|
|
|
|
|
def _publish(self, data): |
|
|
def _publish(self, data): |
|
|
pickled_data = pickle.dumps( |
|
|
pickled_data = pickle.dumps( |
|
|