[Data Loss / Corruption] Loading Errors when Using Multiple Connections Sequentially
Loading data from multiple connections sequentially can cause various corrupted load errors. I believe this is due to logic in decoder_dict_from_ctx not differentiating connections properly when loading decoding dictionaries: https://github.com/phiresky/sqlite-zstd/blob/3a820f34f326a2d177292071af42104d2634316c/src/dict_management.rs#L56-L65
Here is a reproducing snippet. There is a bunch of boilerplate, but essentially the code creates two sqlite DBs, writes data / compresses them, and then loads from them both to validate the data.
import random
import sqlite3
import platform
from time import sleep
from math import ceil
from os import remove
import json
from os import environ
from multiprocessing import Pool, set_start_method
import logging
def gen_data():
NUM_RUNS = 1024
MAX_DUPS = 128
NUM_ROWS = 1024
full_dat = [xi
for xs in [[random.randint(0, 255)] * random.randint(1, MAX_DUPS) for _ in range(NUM_RUNS)]
for xi in xs]
split_inds = [0] + sorted([random.choice(range(len(full_dat))) + 1 for _ in range(NUM_ROWS)])
dat = [str(full_dat[start_ind:end_ind]) for start_ind, end_ind in zip(split_inds[:-1], split_inds[1:])]
return dat
def load_zstd(conn):
with conn:
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA auto_vacuum=1")
conn.commit()
conn.execute("VACUUM")
conn.commit()
with conn:
conn.enable_load_extension(True)
with conn:
if platform.system().lower() != "darwin":
conn.load_extension('libsqlite_zstd.so')
else:
conn.load_extension('libsqlite_zstd.dylib')
with conn:
conn.enable_load_extension(False)
def create_table_with_compression(connection):
column_specs = [('entry_index', 'INTEGER', 'PRIMARY KEY'),
('entry_data', 'BLOB')]
column_spec_strs = [' '.join(column_spec) for column_spec in column_specs]
create_statement = f"CREATE TABLE t({', '.join(column_spec_strs)})"
with connection:
connection.execute(create_statement)
connection.commit()
compress_config = {
'table': 't',
'column': 'entry_data',
'compression_level': 15,
'dict_chooser': "'a'",
'min_dict_size_bytes_for_training': 256,
'dict_size_ratio': 0.1,
'train_dict_samples_ratio': 100.0,
}
min_compress_size = ceil(1 / (compress_config['dict_size_ratio'] * compress_config['train_dict_samples_ratio']))
with connection:
connection.execute("SELECT zstd_enable_transparent(?)",
(json.dumps(compress_config),))
connection.commit()
def insert_data_into_table(connection, data):
with connection:
connection.executemany(f"INSERT OR REPLACE INTO t VALUES(?, ?)",
enumerate(data))
connection.commit()
def incremental_compress(connection):
with connection:
compression_result = connection.execute(f"SELECT zstd_incremental_maintenance(60, 1.0)")
connection.commit()
def gen_db(filename, data):
connection = sqlite3.connect(filename)
load_zstd(connection)
create_table_with_compression(connection)
insert_data_into_table(connection, data)
incremental_compress(connection)
connection.close()
def load_db(filename):
logging.warning(f"Loading {filename}")
connection = sqlite3.connect(filename)
load_zstd(connection)
result = [xi[0].decode('ascii') for xi in connection.execute("SELECT entry_data FROM t")]
connection.close()
logging.warning(f"Loaded {filename}")
return result
GEN_MULTIPROC = True
LOAD_MULTIPROC = False
MAX_TRIES = 64
if __name__=="__main__":
set_start_method('spawn')
environ["SQLITE_ZSTD_LOG"] = "warn"
a_dat = gen_data()
b_dat = gen_data()
for i in range(MAX_TRIES):
try:
remove('a.sqlite')
except:
pass
try:
remove('b.sqlite')
except:
pass
if GEN_MULTIPROC:
with Pool(2) as p:
p.starmap(gen_db,
(('a.sqlite', a_dat),
('b.sqlite', b_dat)))
else:
gen_db('a.sqlite', a_dat)
gen_db('b.sqlite', b_dat)
environ["SQLITE_ZSTD_LOG"] = "debug"
if LOAD_MULTIPROC:
with Pool(2) as p:
a_loaded, b_loaded = p.map(load_db, ['a.sqlite',
'b.sqlite'])
else:
a_loaded = load_db('a.sqlite')
b_loaded = load_db('b.sqlite')
environ["SQLITE_ZSTD_LOG"] = "warn"
assert(all(ai == aii for ai, aii in zip(a_dat, a_loaded)))
assert(all(bi == bii for bi, bii in zip(b_dat, b_loaded)))
This generates output like the following (SQLITE_ZSTD logging adjusted in the code to clarify the key indicator):
WARNING:root:Loading a.sqlite
[2024-09-11T14:58:24Z INFO sqlite_zstd::create_extension] [sqlite-zstd] initialized
[2024-09-11T14:58:24Z DEBUG sqlite_zstd::dict_management] loading decoder dictionary 1 (should only happen once per 10s)
WARNING:root:Loaded a.sqlite
WARNING:root:Loading b.sqlite
[2024-09-11T14:58:24Z INFO sqlite_zstd::create_extension] [sqlite-zstd] initialized
[2024-09-11T14:58:24Z DEBUG sqlite_zstd::dict_management] loading decoder dictionary 1 (should only happen once per 10s)
WARNING:root:Loaded b.sqlite
WARNING:root:Loading a.sqlite
[2024-09-11T14:58:25Z INFO sqlite_zstd::create_extension] [sqlite-zstd] initialized
[2024-09-11T14:58:25Z DEBUG sqlite_zstd::dict_management] loading decoder dictionary 1 (should only happen once per 10s)
WARNING:root:Loaded a.sqlite
WARNING:root:Loading b.sqlite
[2024-09-11T14:58:25Z INFO sqlite_zstd::create_extension] [sqlite-zstd] initialized
[2024-09-11T14:58:25Z DEBUG sqlite_zstd::dict_management] loading decoder dictionary 1 (should only happen once per 10s)
WARNING:root:Loaded b.sqlite
WARNING:root:Loading a.sqlite
[2024-09-11T14:58:25Z INFO sqlite_zstd::create_extension] [sqlite-zstd] initialized
[2024-09-11T14:58:25Z DEBUG sqlite_zstd::dict_management] loading decoder dictionary 1 (should only happen once per 10s)
WARNING:root:Loaded a.sqlite
WARNING:root:Loading b.sqlite
[2024-09-11T14:58:25Z INFO sqlite_zstd::create_extension] [sqlite-zstd] initialized
[2024-09-11T14:58:25Z DEBUG sqlite_zstd::dict_management] loading decoder dictionary 1 (should only happen once per 10s)
WARNING:root:Loaded b.sqlite
WARNING:root:Loading a.sqlite
[2024-09-11T14:58:25Z INFO sqlite_zstd::create_extension] [sqlite-zstd] initialized
[2024-09-11T14:58:25Z DEBUG sqlite_zstd::dict_management] loading decoder dictionary 1 (should only happen once per 10s)
WARNING:root:Loaded a.sqlite
WARNING:root:Loading b.sqlite
[2024-09-11T14:58:25Z INFO sqlite_zstd::create_extension] [sqlite-zstd] initialized
Traceback (most recent call last):
File "/Users/mharradon/tmp/sqlite-ztd-error/error.py", line 138, in <module>
b_loaded = load_db('b.sqlite')
File "/Users/mharradon/tmp/sqlite-ztd-error/error.py", line 96, in load_db
result = [xi[0].decode('ascii') for xi in connection.execute("SELECT entry_data FROM t")]
File "/Users/mharradon/tmp/sqlite-ztd-error/error.py", line 96, in <listcomp>
result = [xi[0].decode('ascii') for xi in connection.execute("SELECT entry_data FROM t")]
sqlite3.OperationalError: decoding
Caused by:
Corrupted block detected
I suspect the code I referred to is the issue as each decoding error is missing a preceding DEBUG sqlite_zstd::dict_management] loading decoder dictionary 1 (should only happen once per 10s). Also, as far as I've seen, the issue can be worked around in 2 ways:
- Load databases in fully independent processes (
LOAD_MULTIPROC = Truein the demo script) sleep(10)between accessing different connections
I originally observed this problem in a setting where the databases were written and read by separate systems at different times, so I don't suspect issues due to interaction between compression and decompression. I can't rule out that compression has a similar version of this issue on its own.
I have attempted a (very sloppy) fix in #44. Apologies, I'm no Rust author 😆. I can confirm that this fixes the above issue with both GEN_ and LOAD_- MULTIPROC = False - and that changes to both the encoder and the decoder parts were necessary to achieve that. Hopefully if nothing else that can point in the direction of a nicer fix.
Thanks for the great project!
Thank you for the issue. I'm a bit confused as to how this happens. I see in your PR you add the path to the db to the cache key. But the cache key already includes the connection handle, which should definitely be different for two separate connections. Do you have any idea why it is not so?
Hi @phiresky, thanks so much for responding! I don't have a good understanding of why that would happen. I vaguely recall that I checked that and did find the keys collided as-is - but that was a while back.
In the example here I open and close one connection before opening the next. Could the first connection be deallocated such that the second connection gets the same address?
Thanks for the info. That could indeed be, if you close one connection the new one may well get the same pointer location.. I didn't consider that case.
If that is the reason why the wrong dicts are used, then it should be fixeable - SQLite extensions are unloaded when the connection is closed by calling the xDlClose function on sqlite_vfs ( https://sqlite.org/loadext.html ). It should be possible to hook that function and clear the cache, or somehow initialize the cache depending on the extension/vfs object in the first place. Sadly I haven't touched this code in a while so I'm not entirely sure how we could make this depend on the vfs deinitialization when we don't even use the VFS feature..
Thank you for your PR, but I'm kind of partial to it because that pragma query happens very frequently which I think may have a pretty bad impact on performance... that's why I added that in-memory cache in those places in the first place.
Another (a bit hacky) workaround might be to call sqlite3_auto_extension() with a callback that clears the cache. Then the cache should be cleared whenever a new connection is created, which should be all times when the old cache keys may become invalid.
Actually I found an even easier method. Since we already have a hook when a new connection is opened, I can simply invalidate the cache when that happens! I tested it with your above posted python example and it works, so thank you for that!
Thank you so much!