redis-py icon indicating copy to clipboard operation
redis-py copied to clipboard

ReacquiringLock: automatically reacquiring lock

Open alvassin opened this issue 2 years ago • 1 comments

I often use this library (previously aioredis) to implement reacquiring locks in redis for exclusive long-running processes. I would be glad to make a PR to share my experience, please see if ideas are ok.

Main idea: lock is acquired for a short period of time and then reacquired to prolong its ttl in background async task, that is created in acquire method.

class ReacquiringLock(Lock):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._auto_reacquire_timeout = self.timeout * 0.5
        self._auto_reacquiring_task: Optional[asyncio.Task] = None

    async def _auto_reacquire(self):
        while True:
            await asyncio.sleep(self._auto_reacquire_timeout)
            await self.reacquire()

    async def acquire(self, *args, **kwargs):
        if acquired := await super().acquire(*args, **kwargs):
            self._auto_reacquiring_task = asyncio.create_task(
                self._auto_reacquire()
            )
        return acquired

    async def release(self):
        if self._auto_reacquiring_task:
            self._auto_reacquiring_task.cancel()
            with suppress(asyncio.CancelledError):
                await self._auto_reacquiring_task

        await super().release()

Second idea, if lock is used in async context manager and it was not able to reacquire for some reason, I would like to cancel long-running process in ctx manager:

lock = redis_client.lock(
    name='lock name',
    timeout=10,
    blocking_timeout=0,
    lock_class=ReacquiringLock
)
async with lock:
    ...
    # long running process here
    # i expect it should be cancelled 
    # if it is impossible to reacquire lock 

This can be achieved via asyncio.current_task, where ReacquiringLock.acquire() method was called:

class ReacquiringLockWithCancellableCtx(ReacquiringLock):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._current_ctx_task: Optional[asyncio.Task] = None

    def __cancel_current_ctx_task(self):
        if self._current_ctx_task is None:
            return
        if self._current_ctx_task.done():
            return
        self._current_ctx_task.cancel()

    async def reacquire(self) -> bool:
        try:
            result = await super().reacquire()
            if not result:
                self.__cancel_current_ctx_task()
            return result
        except RedisError:
            self.__cancel_current_ctx_task()

    async def __aenter__(self):
        if self._current_ctx_task is not None:
            raise RuntimeError('Trying to acquire twice')
        self._current_ctx_task = asyncio.current_task()
        await super().__aenter__()

What do you think? Could it be useful as part of this library?

alvassin avatar Mar 06 '23 21:03 alvassin

This issue is marked stale. It will be closed in 30 days if it is not updated.

github-actions[bot] avatar Mar 14 '24 00:03 github-actions[bot]