|
|
|
@ -12,7 +12,7 @@ class TestAsyncRedisManager: |
|
|
|
async_redis_manager.aioredis = None |
|
|
|
|
|
|
|
with pytest.raises(RuntimeError): |
|
|
|
AsyncRedisManager('redis://') |
|
|
|
AsyncRedisManager('redis://')._redis_connect() |
|
|
|
assert AsyncRedisManager('unix:///var/sock/redis.sock') is not None |
|
|
|
|
|
|
|
async_redis_manager.aioredis = saved_redis |
|
|
|
@ -22,7 +22,7 @@ class TestAsyncRedisManager: |
|
|
|
async_redis_manager.aiovalkey = None |
|
|
|
|
|
|
|
with pytest.raises(RuntimeError): |
|
|
|
AsyncRedisManager('valkey://') |
|
|
|
AsyncRedisManager('valkey://')._redis_connect() |
|
|
|
assert AsyncRedisManager('unix:///var/sock/redis.sock') is not None |
|
|
|
|
|
|
|
async_redis_manager.aiovalkey = saved_valkey |
|
|
|
@ -34,18 +34,18 @@ class TestAsyncRedisManager: |
|
|
|
async_redis_manager.aiovalkey = None |
|
|
|
|
|
|
|
with pytest.raises(RuntimeError): |
|
|
|
AsyncRedisManager('redis://') |
|
|
|
AsyncRedisManager('redis://')._redis_connect() |
|
|
|
with pytest.raises(RuntimeError): |
|
|
|
AsyncRedisManager('valkey://') |
|
|
|
AsyncRedisManager('valkey://')._redis_connect() |
|
|
|
with pytest.raises(RuntimeError): |
|
|
|
AsyncRedisManager('unix:///var/sock/redis.sock') |
|
|
|
AsyncRedisManager('unix:///var/sock/redis.sock')._redis_connect() |
|
|
|
|
|
|
|
async_redis_manager.aioredis = saved_redis |
|
|
|
async_redis_manager.aiovalkey = saved_valkey |
|
|
|
|
|
|
|
def test_bad_url(self): |
|
|
|
with pytest.raises(ValueError): |
|
|
|
AsyncRedisManager('http://localhost:6379') |
|
|
|
AsyncRedisManager('http://localhost:6379')._redis_connect() |
|
|
|
|
|
|
|
def test_redis_connect(self): |
|
|
|
urls = [ |
|
|
|
@ -72,6 +72,8 @@ class TestAsyncRedisManager: |
|
|
|
] |
|
|
|
for url in urls: |
|
|
|
c = AsyncRedisManager(url) |
|
|
|
assert c.redis is None |
|
|
|
c._redis_connect() |
|
|
|
assert isinstance(c.redis, redis.asyncio.Redis) |
|
|
|
|
|
|
|
def test_valkey_connect(self): |
|
|
|
@ -102,6 +104,8 @@ class TestAsyncRedisManager: |
|
|
|
] |
|
|
|
for url in urls: |
|
|
|
c = AsyncRedisManager(url) |
|
|
|
assert c.redis is None |
|
|
|
c._redis_connect() |
|
|
|
assert isinstance(c.redis, valkey.asyncio.Valkey) |
|
|
|
|
|
|
|
async_redis_manager.aioredis = saved_redis |
|
|
|
|