DictDataBase
DictDataBase copied to clipboard
Better locking
Problems
- Currently, a lock is removed after a certain threshold of time, so automatically remove dead locks. But if a session is taking longer than the timeout, it will also remove that active lock
- Locking is relatively slow under heavy multithreaded work
Solution for 1
- The timeout problem could be fixed if the active lock periodically (less than timeout duration) rewrites its lock with a newer timestamp, so no other process will remove it. The difficulty is to how this should work, having an extra thread or process doing the refresh work is an option
Solution for 1 and 2
Using a different locking mechanism could fix both problems at once
- os.O_EXLOCK, but not available on linux
- fcntl, but not available on windows
- portalocker
- Aplies to fcntl and portalocker: http://0pointer.de/blog/projects/locking.html (fcntl locks are process bound and cannot be used with multithreaded environments)
Proof of concept with fcntl:
from multiprocessing.pool import Pool
import os
import json
import contextlib
import fcntl
import time
RUNS_PER_WORKER = 30_000
WORKERS = 10
def writer():
for _ in range(RUNS_PER_WORKER):
with open("test.json", "r+b") as f:
fcntl.lockf(f, fcntl.LOCK_EX)
counter = json.loads(f.read())
counter["counter"] += 1
f.seek(0)
f.write(json.dumps(counter).encode())
f.truncate()
fcntl.lockf(f, fcntl.LOCK_UN)
def reader():
for _ in range(RUNS_PER_WORKER):
with open("test.json", "r+") as f:
fcntl.lockf(f, fcntl.LOCK_SH)
counter = json.loads(f.read())
fcntl.lockf(f, fcntl.LOCK_UN)
if __name__ == "__main__":
t1 = time.time()
with contextlib.suppress(FileExistsError):
fd = os.open("test.json", os.O_CREAT | os.O_RDWR | os.O_EXCL)
os.write(fd, json.dumps({"counter": 0}).encode())
os.close(fd)
pool = Pool(WORKERS)
for _ in range(WORKERS):
pool.apply_async(writer)
pool.apply_async(reader)
pool.close()
pool.join()
td = time.time() - t1
print(f"Time: {td:.2f} seconds, per second: {WORKERS * RUNS_PER_WORKER / td:.2f}")
-> 19076 op/s
Current solution:
from multiprocessing.pool import Pool
import os
import json
import contextlib
import time
import dictdatabase as DDB
RUNS_PER_WORKER = 3_000
WORKERS = 10
def writer():
DDB.config.storage_directory = "."
for _ in range(RUNS_PER_WORKER):
with DDB.at("test").session() as (session, t):
t["counter"] = t["counter"] + 1
session.write()
def reader():
DDB.config.storage_directory = "."
for _ in range(RUNS_PER_WORKER):
DDB.at("test").read()
if __name__ == "__main__":
t1 = time.time()
with contextlib.suppress(FileExistsError):
fd = os.open("test.json", os.O_CREAT | os.O_RDWR | os.O_EXCL)
os.write(fd, json.dumps({"counter": 0}).encode())
os.close(fd)
pool = Pool(WORKERS)
for _ in range(WORKERS):
pool.apply_async(writer)
pool.apply_async(reader)
pool.close()
pool.join()
td = time.time() - t1
print(f"Time: {td:.2f} seconds, per second: {WORKERS * RUNS_PER_WORKER / td:.2f}")
-> 538 op/s