sqlite-zstd icon indicating copy to clipboard operation
sqlite-zstd copied to clipboard

[Data Loss / Corruption] Loading Errors when Using Multiple Connections Sequentially

Open mharradon opened this issue 1 year ago • 1 comments

Loading data from multiple connections sequentially can cause various corrupted load errors. I believe this is due to logic in decoder_dict_from_ctx not differentiating connections properly when loading decoding dictionaries: https://github.com/phiresky/sqlite-zstd/blob/3a820f34f326a2d177292071af42104d2634316c/src/dict_management.rs#L56-L65

Here is a reproducing snippet. There is a bunch of boilerplate, but essentially the code creates two sqlite DBs, writes data / compresses them, and then loads from them both to validate the data.

import random
import sqlite3
import platform
from time import sleep
from math import ceil
from os import remove
import json
from os import environ
from multiprocessing import Pool, set_start_method
import logging

def gen_data():
    NUM_RUNS = 1024
    MAX_DUPS = 128
    NUM_ROWS = 1024

    full_dat = [xi 
                for xs in [[random.randint(0, 255)] * random.randint(1, MAX_DUPS) for _ in range(NUM_RUNS)]
                for xi in xs]
    split_inds = [0] + sorted([random.choice(range(len(full_dat))) + 1 for _ in range(NUM_ROWS)])
    dat = [str(full_dat[start_ind:end_ind]) for start_ind, end_ind in zip(split_inds[:-1], split_inds[1:])]
    return dat

def load_zstd(conn):
    with conn:
        conn.execute("PRAGMA journal_mode=WAL")
        conn.execute("PRAGMA auto_vacuum=1")
        conn.commit()
        conn.execute("VACUUM")
        conn.commit()

    with conn:
        conn.enable_load_extension(True)
    with conn:
        if platform.system().lower() != "darwin":
            conn.load_extension('libsqlite_zstd.so')
        else:
            conn.load_extension('libsqlite_zstd.dylib')
    with conn:
        conn.enable_load_extension(False)

def create_table_with_compression(connection):
    column_specs = [('entry_index', 'INTEGER', 'PRIMARY KEY'), 
                    ('entry_data', 'BLOB')]
    column_spec_strs = [' '.join(column_spec) for column_spec in column_specs]
    create_statement = f"CREATE TABLE t({', '.join(column_spec_strs)})"

    with connection:
        connection.execute(create_statement)
        connection.commit()

    compress_config = {
        'table': 't',
        'column': 'entry_data',
        'compression_level': 15,
        'dict_chooser': "'a'",
        'min_dict_size_bytes_for_training': 256,
        'dict_size_ratio': 0.1,
        'train_dict_samples_ratio': 100.0,
    }

    min_compress_size = ceil(1 / (compress_config['dict_size_ratio'] * compress_config['train_dict_samples_ratio']))

    with connection:
        connection.execute("SELECT zstd_enable_transparent(?)",
                           (json.dumps(compress_config),))
        connection.commit()

def insert_data_into_table(connection, data):
    with connection:
        connection.executemany(f"INSERT OR REPLACE INTO t VALUES(?, ?)",
                               enumerate(data))
        connection.commit()

def incremental_compress(connection):
    with connection:
        compression_result = connection.execute(f"SELECT zstd_incremental_maintenance(60, 1.0)")
        connection.commit()

def gen_db(filename, data):
    connection = sqlite3.connect(filename)

    load_zstd(connection)

    create_table_with_compression(connection)
    insert_data_into_table(connection, data)
    incremental_compress(connection)

    connection.close()

def load_db(filename):
    logging.warning(f"Loading {filename}")

    connection = sqlite3.connect(filename)
    load_zstd(connection)
    result = [xi[0].decode('ascii') for xi in connection.execute("SELECT entry_data FROM t")]
    connection.close()

    logging.warning(f"Loaded {filename}")
    return result

GEN_MULTIPROC = True
LOAD_MULTIPROC = False
MAX_TRIES = 64

if __name__=="__main__":
    set_start_method('spawn')
    environ["SQLITE_ZSTD_LOG"] = "warn"
    a_dat = gen_data()
    b_dat = gen_data()

    for i in range(MAX_TRIES):
        try:
            remove('a.sqlite')
        except:
            pass
        try:
            remove('b.sqlite')
        except:
            pass

        if GEN_MULTIPROC:
            with Pool(2) as p:
                p.starmap(gen_db, 
                          (('a.sqlite', a_dat),
                           ('b.sqlite', b_dat))) 
        else:
            gen_db('a.sqlite', a_dat)
            gen_db('b.sqlite', b_dat)

        environ["SQLITE_ZSTD_LOG"] = "debug"
        if LOAD_MULTIPROC:
            with Pool(2) as p:
                a_loaded, b_loaded = p.map(load_db, ['a.sqlite',
                                                     'b.sqlite'])
        else:
            a_loaded = load_db('a.sqlite')
            b_loaded = load_db('b.sqlite')

        environ["SQLITE_ZSTD_LOG"] = "warn"

        assert(all(ai == aii for ai, aii in zip(a_dat, a_loaded)))
        assert(all(bi == bii for bi, bii in zip(b_dat, b_loaded)))

This generates output like the following (SQLITE_ZSTD logging adjusted in the code to clarify the key indicator):

WARNING:root:Loading a.sqlite
[2024-09-11T14:58:24Z INFO  sqlite_zstd::create_extension] [sqlite-zstd] initialized
[2024-09-11T14:58:24Z DEBUG sqlite_zstd::dict_management] loading decoder dictionary 1 (should only happen once per 10s)
WARNING:root:Loaded a.sqlite
WARNING:root:Loading b.sqlite
[2024-09-11T14:58:24Z INFO  sqlite_zstd::create_extension] [sqlite-zstd] initialized
[2024-09-11T14:58:24Z DEBUG sqlite_zstd::dict_management] loading decoder dictionary 1 (should only happen once per 10s)
WARNING:root:Loaded b.sqlite
WARNING:root:Loading a.sqlite
[2024-09-11T14:58:25Z INFO  sqlite_zstd::create_extension] [sqlite-zstd] initialized
[2024-09-11T14:58:25Z DEBUG sqlite_zstd::dict_management] loading decoder dictionary 1 (should only happen once per 10s)
WARNING:root:Loaded a.sqlite
WARNING:root:Loading b.sqlite
[2024-09-11T14:58:25Z INFO  sqlite_zstd::create_extension] [sqlite-zstd] initialized
[2024-09-11T14:58:25Z DEBUG sqlite_zstd::dict_management] loading decoder dictionary 1 (should only happen once per 10s)
WARNING:root:Loaded b.sqlite
WARNING:root:Loading a.sqlite
[2024-09-11T14:58:25Z INFO  sqlite_zstd::create_extension] [sqlite-zstd] initialized
[2024-09-11T14:58:25Z DEBUG sqlite_zstd::dict_management] loading decoder dictionary 1 (should only happen once per 10s)
WARNING:root:Loaded a.sqlite
WARNING:root:Loading b.sqlite
[2024-09-11T14:58:25Z INFO  sqlite_zstd::create_extension] [sqlite-zstd] initialized
[2024-09-11T14:58:25Z DEBUG sqlite_zstd::dict_management] loading decoder dictionary 1 (should only happen once per 10s)
WARNING:root:Loaded b.sqlite
WARNING:root:Loading a.sqlite
[2024-09-11T14:58:25Z INFO  sqlite_zstd::create_extension] [sqlite-zstd] initialized
[2024-09-11T14:58:25Z DEBUG sqlite_zstd::dict_management] loading decoder dictionary 1 (should only happen once per 10s)
WARNING:root:Loaded a.sqlite
WARNING:root:Loading b.sqlite
[2024-09-11T14:58:25Z INFO  sqlite_zstd::create_extension] [sqlite-zstd] initialized
Traceback (most recent call last):
  File "/Users/mharradon/tmp/sqlite-ztd-error/error.py", line 138, in <module>
    b_loaded = load_db('b.sqlite')
  File "/Users/mharradon/tmp/sqlite-ztd-error/error.py", line 96, in load_db
    result = [xi[0].decode('ascii') for xi in connection.execute("SELECT entry_data FROM t")]
  File "/Users/mharradon/tmp/sqlite-ztd-error/error.py", line 96, in <listcomp>
    result = [xi[0].decode('ascii') for xi in connection.execute("SELECT entry_data FROM t")]
sqlite3.OperationalError: decoding

Caused by:
    Corrupted block detected

I suspect the code I referred to is the issue as each decoding error is missing a preceding DEBUG sqlite_zstd::dict_management] loading decoder dictionary 1 (should only happen once per 10s). Also, as far as I've seen, the issue can be worked around in 2 ways:

  1. Load databases in fully independent processes (LOAD_MULTIPROC = True in the demo script)
  2. sleep(10) between accessing different connections

I originally observed this problem in a setting where the databases were written and read by separate systems at different times, so I don't suspect issues due to interaction between compression and decompression. I can't rule out that compression has a similar version of this issue on its own.

mharradon avatar Sep 11 '24 15:09 mharradon

I have attempted a (very sloppy) fix in #44. Apologies, I'm no Rust author 😆. I can confirm that this fixes the above issue with both GEN_ and LOAD_- MULTIPROC = False - and that changes to both the encoder and the decoder parts were necessary to achieve that. Hopefully if nothing else that can point in the direction of a nicer fix.

Thanks for the great project!

mharradon avatar Sep 11 '24 19:09 mharradon

Thank you for the issue. I'm a bit confused as to how this happens. I see in your PR you add the path to the db to the cache key. But the cache key already includes the connection handle, which should definitely be different for two separate connections. Do you have any idea why it is not so?

phiresky avatar Apr 30 '25 10:04 phiresky

Hi @phiresky, thanks so much for responding! I don't have a good understanding of why that would happen. I vaguely recall that I checked that and did find the keys collided as-is - but that was a while back.

In the example here I open and close one connection before opening the next. Could the first connection be deallocated such that the second connection gets the same address?

mharradon avatar May 01 '25 15:05 mharradon

Thanks for the info. That could indeed be, if you close one connection the new one may well get the same pointer location.. I didn't consider that case.

If that is the reason why the wrong dicts are used, then it should be fixeable - SQLite extensions are unloaded when the connection is closed by calling the xDlClose function on sqlite_vfs ( https://sqlite.org/loadext.html ). It should be possible to hook that function and clear the cache, or somehow initialize the cache depending on the extension/vfs object in the first place. Sadly I haven't touched this code in a while so I'm not entirely sure how we could make this depend on the vfs deinitialization when we don't even use the VFS feature..

Thank you for your PR, but I'm kind of partial to it because that pragma query happens very frequently which I think may have a pretty bad impact on performance... that's why I added that in-memory cache in those places in the first place.

phiresky avatar May 10 '25 15:05 phiresky

Another (a bit hacky) workaround might be to call sqlite3_auto_extension() with a callback that clears the cache. Then the cache should be cleared whenever a new connection is created, which should be all times when the old cache keys may become invalid.

phiresky avatar May 10 '25 15:05 phiresky

Actually I found an even easier method. Since we already have a hook when a new connection is opened, I can simply invalidate the cache when that happens! I tested it with your above posted python example and it works, so thank you for that!

phiresky avatar May 10 '25 16:05 phiresky

Thank you so much!

mharradon avatar May 10 '25 17:05 mharradon