filelock
filelock copied to clipboard
Request for Asyncio Support
It would block other coroutines when acquiring locks now.
Hope it can use unblocking ways to wait for the lock. Such as asyncio.sleep
By the way, in the meantime, I'm using this way to keep other coroutines running.
acquired_lock = False
while not acquired_lock:
try:
BEFORE_NET_START_LOCK.acquire(timeout=0)
acquired_lock = True
except Timeout:
await asyncio.sleep(1)
Hope this code can help with someone facing the same problem.
This ought to be implemented with async with: https://www.python.org/dev/peps/pep-0492/#asynchronous-context-managers-and-async-with
This ought to improve performance as well, since it looks like acquire currently uses a busy-wait: https://github.com/benediktschmitt/py-filelock/blob/master/filelock.py#L284
This ought to be implemented with
async with: python.org/dev/peps/pep-0492/#asynchronous-context-managers-and-async-with
That would imply an event loop running, which is often not the case. So we cannot add asyncio support straight up, however if someone would make this library sansio type we'd be happy to review that PR :+1:
I've gone over the code and as far as I can tell there is no way to make things nicely async.
Fundamentally, the locking via _acquire requires polling and all that asyncio/trio/... can do is provide an await sleep(poll_interval) instead of the current time.sleep(poll_interval). That is really bad for the event loop at the default poll_interval=0.05; some exponential backoff might help here.
Alternatively, one could reimplement a blocking acquire and use a thread (as suggested on SO). That is ideal if we definitely want to acquire a few locks; it is fatal with timeouts/cancellation and many locks, since the threads will keep on living "forever" in the background in the worst case.
If you are fine with having a busy loop, I would suggest (and can provide) an async with context manager that basically uses the while True: + await sleep to poll the lock. There is no way to make it truly sansio, but the sleep function could be parametrized easily.
This could either be a separate class (say AsyncFileLock) that wraps a regular FileLock or merely adding the required magic methods for async with directly to FileLock. Would that be simple enough to add to the library?
My 2c is that we should do this totally in parallel with the existing sync classes 😊 so yeah adding AsyncFileLock class is the way ahead, and no busy-wait just full event loop design.
The busy polling (asyncio.wait_for) to acquire the lock is inevitable. But, to lessen the burden, can we watch for the lock file before entering the busy lock ? Using true aio, it would be possible to use inotify with epoll, and the Windows equivalent. I do not know any library to do this, it is just a idea.
I have implemented a semi-efficient asynchronous filelock using anyio: https://github.com/dolamroth/starlette-web/blob/main/starlette_web/common/files/filelock.py
It passes some basic tests: https://github.com/dolamroth/starlette-web/blob/main/starlette_web/tests/core/helpers/base_cache_tester.py If anyone is interested, you may pick it up.
We had implemented async support using
@asynccontextmanager
async def async_lock(lock: BaseFileLock):
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, lock.acquire)
yield
await loop.run_in_executor(None, lock.release)
However the recent change to make locks thread local broke this and resulted in our code deadlocking. We have moved to using a static ThreadPoolExecutor with one thread, but this is definitely a gotcha.
LOCK_POOL = ThreadPoolExecutor(max_workers=1)
@asynccontextmanager
async def async_lock(lock: BaseFileLock):
loop = asyncio.get_event_loop()
await loop.run_in_executor(LOCK_POOL, lock.acquire)
yield
await loop.run_in_executor(LOCK_POOL, lock.release)
I ended up with the following implementation
import asyncio
import contextlib
from collections.abc import AsyncGenerator
from filelock import BaseFileLock
@asynccontextmanager
async def _acquire_lock(fl: BaseFileLock) -> AsyncGenerator[None, None]:
"""
Acquire filelock, async implementation
"""
for _ in range(int(10 / 0.05)):
with contextlib.suppress(TimeoutError):
try:
fl.acquire(blocking=False)
# We want to have the exclusive lock
if fl.lock_counter <= 1:
yield
break
finally:
fl.release()
await asyncio.sleep(0.05)
else:
raise TimeoutError("Could not obtain lock after 10 seconds")
For me the main point is that the sleep is non blocking.
The release and acquire code is fast enough to not block other things of the event loop.
Note also the try: … finally: the will ensure that the file lock is released on exceptions.
For me it was also important, that only one lock is active: In the documentation the following sync code was given as example why recursive locks should be allowed
def cite1():
with lock:
with open(file_path, "a") as f:
f.write("I hate it when he does that.")
def cite2():
with lock:
with open(file_path, "a") as f:
f.write("You don't want to sell me death sticks.")
# The lock is acquired here.
with lock:
cite1()
cite2()
# And released
However transferring this example to a sync world, show that is not valid for async .
# cite1 and cite2 are implemented async and with multiple writes
async def run():
with TaskGroup() as tg:
tg.create_task(cite1())
tg.create_task(cite2()) # potentially boom as the same file is opened/written to at the same time.
Of course wrapping also the TaskGroup in an async with _acquire_lock(): would deadlock.
However using it differently would be unsafe.
I also wrote a little test that shows my implementation works as expected for me:
Show Test implementation
import asyncio
import time
from pathlib import Path
import pytest
@pytest.mark.asyncio
async def test_async_file_lock(tmp_path: Path) -> None:
"""
Acquiring the lock is non-blocking
"""
test_file = tmp_path / "test_file.txt"
start = time.perf_counter_ns()
regular_calls: list[float] = []
def get_elapsed() -> int:
return time.perf_counter_ns() - start
def debug(msg: str) -> None:
print(f"{get_elapsed()/10**9:0.5f}: {msg}")
async def reader(delay: float) -> tuple[bytes, float]:
"""
Read file a *delay* seconds
"""
await asyncio.sleep(delay)
debug(f"Obtaining lock for reading after {delay}s delay")
async with pkg_project_mapping._file_lock(test_file):
debug(f"Got lock for reading after {delay}s delay")
result = test_file.read_bytes(), get_elapsed() / 1e9
debug("Released lock for reading after {delay}s delay")
return result
async def writer() -> None:
"""
Write to file 0.1 lines per second and append to it after one second of waiting
"""
for repeat in range(2):
debug(f"Obtain write lock {repeat}")
async with pkg_project_mapping._file_lock(test_file):
debug(f"Got write lock {repeat}")
with test_file.open("a") as f:
for i in range(10):
f.write(f"{i};")
debug(f"Wrote {i} ({repeat})")
await asyncio.sleep(0.1)
f.write("\n")
debug(f"Release lock for write {repeat}")
await asyncio.sleep(1)
async def regular_caller() -> None:
"""
Simulate something that running regular
this ensures that we can check that writing/reading/locking is non blocking for
other async task
"""
for i in range(int(5 / 0.05)): # 20 sec
await asyncio.sleep(0.05)
elapsed = get_elapsed() / 1e9
if regular_calls:
debug(f"Called regular {i:03} after {elapsed-regular_calls[-1]:0.4f}s")
else:
debug(f"Called regular {i:03}")
regular_calls.append(elapsed)
async with asyncio.TaskGroup() as tg:
debug("Print Start Tasks")
tg.create_task(regular_caller())
tg.create_task(writer())
reader1 = tg.create_task(reader(0.02))
reader2 = tg.create_task(reader(2.5))
reader3 = tg.create_task(reader(2.6))
reader4 = tg.create_task(reader(2.7))
# check that regular task was running non-blocking
for n in range(len(regular_calls) - 1):
diff = regular_calls[n + 1] - regular_calls[n]
assert diff == pytest.approx(diff, abs=0.001)
expected_str = b"0;1;2;3;4;5;6;7;8;9;\n"
assert reader1.result()[0] == expected_str
# expected runtime 1.0s for the delay and 0.05 for the actual writing
assert reader1.result()[1] == pytest.approx(1.05, abs=0.1)
assert reader2.result()[0] == expected_str * 2
# expected runtime 3s for the delay and 0.1 for the actual writing
assert reader2.result()[1] == pytest.approx(3.1, abs=0.2)
assert reader3.result()[0] == expected_str * 2
# expected runtime 3s for the delay and 0.1 for the actual writing
assert reader3.result()[1] == pytest.approx(3.105, abs=0.2)
assert reader4.result()[0] == expected_str * 2
# expected runtime 3s for the delay and 0.1 for the actual writing
assert reader4.result()[1] == pytest.approx(3.11, abs=0.2)