ReacquiringLock: automatically reacquiring lock
I often use this library (previously aioredis) to implement reacquiring locks in redis for exclusive long-running processes. I would be glad to make a PR to share my experience, please see if ideas are ok.
Main idea: lock is acquired for a short period of time and then reacquired to prolong its ttl in background async task, that is created in acquire method.
class ReacquiringLock(Lock):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._auto_reacquire_timeout = self.timeout * 0.5
self._auto_reacquiring_task: Optional[asyncio.Task] = None
async def _auto_reacquire(self):
while True:
await asyncio.sleep(self._auto_reacquire_timeout)
await self.reacquire()
async def acquire(self, *args, **kwargs):
if acquired := await super().acquire(*args, **kwargs):
self._auto_reacquiring_task = asyncio.create_task(
self._auto_reacquire()
)
return acquired
async def release(self):
if self._auto_reacquiring_task:
self._auto_reacquiring_task.cancel()
with suppress(asyncio.CancelledError):
await self._auto_reacquiring_task
await super().release()
Second idea, if lock is used in async context manager and it was not able to reacquire for some reason, I would like to cancel long-running process in ctx manager:
lock = redis_client.lock(
name='lock name',
timeout=10,
blocking_timeout=0,
lock_class=ReacquiringLock
)
async with lock:
...
# long running process here
# i expect it should be cancelled
# if it is impossible to reacquire lock
This can be achieved via asyncio.current_task, where ReacquiringLock.acquire() method was called:
class ReacquiringLockWithCancellableCtx(ReacquiringLock):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._current_ctx_task: Optional[asyncio.Task] = None
def __cancel_current_ctx_task(self):
if self._current_ctx_task is None:
return
if self._current_ctx_task.done():
return
self._current_ctx_task.cancel()
async def reacquire(self) -> bool:
try:
result = await super().reacquire()
if not result:
self.__cancel_current_ctx_task()
return result
except RedisError:
self.__cancel_current_ctx_task()
async def __aenter__(self):
if self._current_ctx_task is not None:
raise RuntimeError('Trying to acquire twice')
self._current_ctx_task = asyncio.current_task()
await super().__aenter__()
What do you think? Could it be useful as part of this library?
This issue is marked stale. It will be closed in 30 days if it is not updated.