|
|
@ -95,16 +95,23 @@ class AsyncRedisManager(AsyncPubSubManager): |
|
|
def _redis_connect(self): |
|
|
def _redis_connect(self): |
|
|
module, _ = self._get_redis_module_and_error() |
|
|
module, _ = self._get_redis_module_and_error() |
|
|
parsed_url = urlparse(self.redis_url) |
|
|
parsed_url = urlparse(self.redis_url) |
|
|
|
|
|
pubsub_defaults = { |
|
|
|
|
|
"decode_responses": False, # keep channels as bytes |
|
|
|
|
|
"socket_timeout": None, # block indefinitely on pub/sub |
|
|
|
|
|
"health_check_interval": 0, # no periodic PINGs on read socket |
|
|
|
|
|
"retry_on_timeout": False, # Pub/Sub loop handles reconnects |
|
|
|
|
|
} |
|
|
|
|
|
kwargs = {**pubsub_defaults, **(self.redis_options or {})} |
|
|
|
|
|
|
|
|
if parsed_url.scheme in {"redis+sentinel", "valkey+sentinel"}: |
|
|
if parsed_url.scheme in {"redis+sentinel", "valkey+sentinel"}: |
|
|
sentinels, service_name, connection_kwargs = \ |
|
|
sentinels, service_name, connection_kwargs = \ |
|
|
parse_redis_sentinel_url(self.redis_url) |
|
|
parse_redis_sentinel_url(self.redis_url) |
|
|
kwargs = self.redis_options |
|
|
connection_kwargs.update(kwargs) |
|
|
kwargs.update(connection_kwargs) |
|
|
sentinel = module.sentinel.Sentinel(sentinels, **connection_kwargs) |
|
|
sentinel = module.sentinel.Sentinel(sentinels, **kwargs) |
|
|
|
|
|
self.redis = sentinel.master_for(service_name or self.channel) |
|
|
self.redis = sentinel.master_for(service_name or self.channel) |
|
|
else: |
|
|
else: |
|
|
self.redis = module.Redis.from_url(self.redis_url, |
|
|
self.redis = module.Redis.from_url(self.redis_url, |
|
|
**self.redis_options) |
|
|
**kwargs) |
|
|
self.pubsub = self.redis.pubsub(ignore_subscribe_messages=True) |
|
|
self.pubsub = self.redis.pubsub(ignore_subscribe_messages=True) |
|
|
|
|
|
|
|
|
async def _publish(self, data): # pragma: no cover |
|
|
async def _publish(self, data): # pragma: no cover |
|
|
|