arq
arq copied to clipboard
Allow `connection_pool` to be set in RedisSettings
@samuelcolvin Thank you for a great library! It would be really grateful if this PR could be reviewed or a workaround for this issue provided. Many thanks.
We are using arq for our production app, and it appears that connection pool throws an error and arq worker
crashes if the connection limit(max_connections
) is reached.
Not only does worker but the enqueue_job
method also throws an error under the same condition.
This seems to be caused by the default ConnectionPool
of redis-py.
https://github.com/redis/redis-py/issues/2517
Code to reproduce:
import asyncio
from arq import ArqRedis, create_pool
from arq.connections import RedisSettings
REDIS_SETTINGS = RedisSettings(
max_connections=1
)
async def foo(ctx) -> None:
print('ok')
async def main() -> None:
redis: ArqRedis = await create_pool(REDIS_SETTINGS)
for _ in range(2):
await redis.enqueue_job('foo')
class Worker:
functions = [foo]
redis_settings = REDIS_SETTINGS
if __name__ == '__main__':
asyncio.run(main())
By allowing connection_pool
to be set in RedisSettings, it seems to work as expected.
import asyncio
from arq import ArqRedis, create_pool
from arq.connections import RedisSettings
from redis.asyncio import BlockingConnectionPool
REDIS_SETTINGS = RedisSettings(
connection_pool=BlockingConnectionPool(max_connections=1)
)
async def foo(ctx) -> None:
print("ok")
async def main() -> None:
redis: ArqRedis = await create_pool(REDIS_SETTINGS)
for _ in range(10):
await redis.enqueue_job("foo")
class Worker:
functions = [foo]
redis_settings = REDIS_SETTINGS
if __name__ == "__main__":
asyncio.run(main())
It appears that Worker
can take redis_pool
as an argument, but I guess we can handle this case more concisely this way for both Worker
and Enqueuer
, provided it does not break anything else.