parsec-cloud
parsec-cloud copied to clipboard
A better pattern for handling requests to the local database
PR #1713 fixes specific issues (#1698, #1699, #1700 and partially #1701) but a general solution would be much better.
A possible pattern for all methods that uses the local database (i.e methods in ManifestStorage, BlockStorage and ChunkStorage) would be:
async def read_and_slash_or_write_method_without_cache(self, key, value):
localdb = self.localdb
# Declare serde and database logic in a sync function
def _thread_target():
serialized = serialize(value)
with localdb.threading_lock_and_get_cursor() as cursor:
cursor.execute(some_statement, key, serialized)
rows = cursor.executemany(some_otherstatement, key)
return deserialize(rows)
# Should not fetch or change any attribute of `self`
assert 'self' not in _thread_target.co_names
# Run the sync function in a dedicated thread (no capacity limit)
return await self.localdb.run_in_thread(_thread_target)
Note:
threading_lock_and_get_cursor()prevent concurrent access to the local databasethreading_lock_and_get_cursor()context takes care of handling commit, database connection and errors- serde operations are kept outside the
threading_lock_and_get_cursor()for potential parallelization
Result:
- all operations require a single thread access (without creating the thread since threads are now cached in trio)
- the method doesn't use or affect the object state so it is safe against race conditions
- only the actual access to the localdb is locked by a threading lock
Pros:
- More consistency since all database access fully runs in a single thread call
- The overhead for thread communication is only paid once
Cons:
- Unnecessary thread call for pure read access to the database with small deserialization cost
- If marshmallow doesn't release the GIL (which it probably doesn't), no speedup is gained for parallelizing serde
Extra note:
It is not trivial to manage those accesses to the local database while maintaining a cache in a consistent way, like we do for the manifests. Luckily, the tricky cache handling logic and this proposal can be handled separately. Here's a possible solution.
Let's use both existing _cache and _cache_ahead_of_localdb data structures, along with a new dictionary of locks:
def __init__(self):
self._cache = {}
self._cache_ahead_of_localdb = {}
self._cache_locks = defaultdict(trio.Lock)
Here's the implementation for the synchronous cache methods
def _read_from_cache(self, key):
return self._cache.get(key, None)
def _write_to_cache(self, key, value, extras=()):
self._cache[key] = value
self._cache_ahead_of_localdb.setdefault(key, []).extend(extras)
def _is_cache_ahead_of_localdb(self, key):
return key in self._cache_ahead_of_localdb
def _tag_as_up_to_date_with_localdb(self):
return self._cache_ahead_of_localdb.pop(key, None)
def _get_extras(self, key):
return self._cache_ahead_of_localdb.get(key, None)
The two read/write methods that access the local database implement the proposed pattern explained earlier:
async def _read_from_localdb(self, key):
# See the #1715 proposal
...
async def _write_to_localdb(self, key, value, extras):
# See the #1715 proposal
...
The tricky cache+localdb logic is locked withing two async methods, load_to_cache and flush_to_localdb:
async def load_to_cache(key):
# Make sure the cache hasn't been populated already
cache_result = self._read_from_cache(key)
if cache_result is not None:
return
# Locks against simultaneous cache and localdb access
async with self._cache_locks[key]:
# The cache might have been written in the meantime
cache_result = self._read_from_cache(key)
if cache_result is not None:
return
# Access the local database
localdb_result = await self._read_from_localdb(key)
# The cache might have been written in the meantime
cache_result = self._read_from_cache(key)
if cache_result is not None:
return
# Otherwise, safely write the cache
self._write_to_cache(key, localdb_result)
async def flush_to_localdb(key):
# Make sure there is something to flush
if self._is_cache_ahead_of_localdb(key):
return
# Locks against simultaneous cache and localdb access
async with self._cache_locks[key]:
# Make sure there is something to flush, as it might have changed
if self._is_cache_ahead_of_localdb(key):
return
# Get value and extras
value = self._read_cache(key)
extras = self._get_extras(key)
# Write the value to the local database
await self._write_to_localdb(key, value, extras)
# Make the cache hasn't changed in the meantime
if value != self._read_cache(key):
return
# Otherwise safely acknowledge the successful write
self._tag_as_up_to_date_with_localdb(key)
Finally, the read and write async methods can be implemented as such:
async def read_with_cache(self, key):
# Load cache entry if not available
await self.load_cache_entry(key)
# Then read from cache
return self._read_from_cache(key)
async def write_with_cache(key, value, extras, flush=False):
# Write to cache
self._write_to_cache(key, value, extras)
# Flush if necessary
if flush:
await self.flush_cache_entry(key)