influxdb icon indicating copy to clipboard operation
influxdb copied to clipboard

[1.x/2.x] Deletes are racy

Open zeebo opened this issue 6 years ago • 5 comments

Delete design

Problem statement

Right now, deletes can cause the database to become inconsistent in the presence of concurrent writes, compactions or snapshots. This is avoided in the case of compactions and snapshots by stopping and aborting any that are ongoing. This won't work in a world where retention is constantly issuing deletes.

We want the database to maintain correctness and as high concurrency as possible while handling deletes. If some locks are required, they should not be held while accessing the disk as much as possible.

Proposed solution

The following design has these properties:

  1. Arbitrary writes are allowed while writing tombstones for TSM files.
  2. Compactions and snapshots are allowed while writing tombstones for TSM files.
  3. Only writes containing keys that may be fully deleted are blocked while deleting them from the cache and index.
  4. Compactions and snapshots are only blocked in renaming an output TSM file from the temporary file to the final file while deletes are deleting keys from the cache and index.
  5. The protocol has been formally verified in TLA+ for a single concurrent write, delete, compaction, and snapshot.

In order to deliver on these properties, deletes require:

  1. A lock on creating new TSM files (note that this does not include the time writing the TSM files, just the final rename step).
  2. A lock on writes to keys that may be removed from the index.
  3. Signaling to compaction and cache snapshots to apply a delete to a TSM file they are about to write.
  4. The ability to capture the set of existing TSM files at any moment in time.
  5. A lock on the ability to begin a snapshot/compaction process.

Overview

Writes

  1. Acquire a read lock on any keys being written. Locks are allowed to be less granular: you may lock an entire measurement or even a whole bucket.
  2. Add any series to the index.
  3. Add the data to the cache.
  4. Unlock any locks.

Deletes

  1. First acquire the current set of TSM files.
  2. Perform deletes on those TSM files, keeping track of keys that are possibly dead.
  3. Acquire a lock on creating new TSM files.
  4. If the set of TSM files that now exist is a subset of the TSM files that the deletes have been applied to, goto 5. Otherwise, go back to 1.
  5. Acquire exclusive locks on any keys that may be deleted. Again, it is allowed to be larger in scope than required.
  6. Acquire the lock to begin snapshots/compactions and inform any ongoing snapshots or compactions to perform this delete against their TSM file before renaming it.
  7. Perform the delete on the cache, removing any live keys from the set of possibly dead keys. They are now definitely dead. Start keeping track of a new set of possibly dead keys.
  8. Unlock the lock for snapshots/compactions.
  9. Check the cache possibly dead keys against any TSM files. The remaining are now definitely dead.
  10. Remove all dead keys from the index.
  11. Unlock TSM files and the keys.

Cache snapshots & Compactions

Cache snapshots and compactions proceed as they always have, except that they require the snapshot/compaction lock while determining what work needs to be done (aka grabbing the actual snapshot, or discovering the set of TSM files to merge) and they require the TSM lock to do the rename, and after acquiring the lock, they apply any deletes that were added in step 6 before renaming.

TLA+ model
EXTENDS TLC, Integers, Sequences, FiniteSets

PossibleKeys ==
    {"a", "b"}

PossibleTimestamps ==
    1..2

PossibleData ==
    [key: PossibleKeys, ts: PossibleTimestamps]

Range(f) ==
    {f[x]: x \in DOMAIN f}

Keys(data) ==
    {v.key: v \in data}
    
KeyMatches(d, v) ==
    d.key = v.key
    
KeyFilter(d, data) ==
    {v \in data: ~KeyMatches(d, v)}

KeyMatch(d, data) ==
    {v \in data: KeyMatches(d, v)}

DeleteMatches(d, v) ==
    d.key = v.key /\ v.ts <= d.ts

DeleteFilter(d, data) ==
    {v \in data: ~DeleteMatches(d, v)}
    
DeleteMatch(d, data) ==
    {v \in data: DeleteMatches(d, v)}

Prefix(x, n) ==
    [i \in 1..n |-> x[i]]

(*
--algorithm deletes

variables
    order \in {"write first", "delete first", "concurrent"},
    \* order \in {"concurrent"},

    tsm_files \in [1..2 -> {x \in SUBSET PossibleData: Cardinality(x) > 0}],

    cache \in [
        existing: SUBSET PossibleData,
        incoming: {{}},
        snapshot: {{}}
    ],

    starting_data = cache.existing \union UNION Range(tsm_files),
    
    index = Keys(starting_data),

    write \in PossibleData,
    delete \in PossibleData,    

    state = [
        delete_started  |-> FALSE,
        delete_finished |-> FALSE,
        write_finished  |-> FALSE,
        write_started   |-> FALSE,
        
        tsm_locked            |-> FALSE,
        locked_keys           |-> {},
        compaction_start_lock |-> FALSE,
        
        snapshotting     |-> FALSE,
        snapshot_deletes |-> {},
        
        compacting         |-> FALSE,
        compaction_deletes |-> {}
    ],

define
TSMDeleteFilter(d, files) ==
    [i \in DOMAIN files |-> DeleteFilter(d, files[i])]

TSMData(files) ==
    UNION Range(files)
    
Cache ==
    cache.existing \union cache.incoming
    
CacheWithSnapshot ==
    Cache \union cache.snapshot
    
DataWithWrite ==
    starting_data \union {write}
    
DataWithDelete ==
    DeleteFilter(delete, starting_data)
    
DataWithWriteDelete ==
    DeleteFilter(delete, DataWithWrite)
    
DataWithDeleteWrite ==
    DataWithDelete \union {write}
    
CanSnapshot ==
    Cardinality(Cache) > 0
    
CanCompact ==
    Len(tsm_files) > 1
    
WriterDone ==
    pc["writer"] = "Done"
    
CacheDone ==
    pc["cache"] = "Done"
    
ReadableData ==
    Cache \union TSMData(tsm_files)

ReadableKeys ==
    Keys(ReadableData)
    
Consistency ==
    /\ ( state.delete_finished /\ ~state.write_started  ) =>
        /\ ReadableKeys = index
        /\ ReadableData = DataWithDelete
    /\ ( state.write_finished  /\ ~state.delete_started ) =>
        /\ ReadableKeys = index
        /\ ReadableData = DataWithWrite
    /\ ( state.delete_finished /\  state.write_finished ) =>
        /\ ReadableKeys = index
        /\ order = "write_first"  => ReadableData = DataWithWriteDelete
        /\ order = "delete_first" => ReadableData = DataWithDeleteWrite
        /\ order = "concurrent"   =>
            \/ ReadableData = DataWithDeleteWrite
            \/ ReadableData = DataWithWriteDelete

end define;

\*
\* macros
\*

macro Update(fn, data)
begin
    fn := data @@ fn;
end macro;

macro ApplyDeletes(deletes, data)
begin
    while Cardinality(deletes) > 0 do
        _local_delete := CHOOSE x \in deletes: TRUE;
        data := DeleteFilter(_local_delete, data);
        deletes := deletes \ {_local_delete};
    end while;
end macro;

\*
\* writer process: as simple and lockless as possible.
\*

fair process writer = "writer"
begin
    WriterAcquire:
    await order = "delete first" => state.delete_finished;
    await ~(write.key \in state.locked_keys);
    Update(state, [
        locked_keys   |-> state.locked_keys \union {write.key},
        write_started |-> TRUE
    ]);
    
    WriterIndex:
    index := index \union {write.key};
    
    WriterCache:
    cache.incoming := cache.incoming \union {write};

    WriterRelease:
    Update(state, [
        locked_keys    |-> state.locked_keys \ {write.key},
        write_finished |-> TRUE
    ]);

end process; 

\*
\* deletes: attempts to do as much work that touches disk as possible without
\* acquiring any locks.
\*

fair process delete = "delete"
variable
    _local_tsm_files = <<>>,
    _local_dead_keys = [
        tsm   |-> {},
        cache |-> {}
    ],
begin
    DeleteAcquire:
    await order = "write first" => state.write_finished;
    Update(state, [ delete_started |-> TRUE ]);
    
    DeleteSnapshotTSM:
    _local_tsm_files := tsm_files;

    DeleteClearTSM:
    _local_tsm_files := TSMDeleteFilter(delete, _local_tsm_files);
    Update(_local_dead_keys, [
        tsm |-> LET tsm_keys     == Keys(KeyMatch(delete, TSMData(Prefix(tsm_files, Len(_local_tsm_files)))))
                    local_keys   == Keys(KeyMatch(delete, TSMData(_local_tsm_files)))
                IN  (_local_dead_keys.tsm \union tsm_keys) \ local_keys
    ]);  
    tsm_files := _local_tsm_files @@ tsm_files;
    
    DeleteLockAndCheckTSM:
    await ~state.tsm_locked;
    if Len(_local_tsm_files) = Len(tsm_files) then
        Update(state, [tsm_locked |-> TRUE]);
        goto DeleteFinalize;
    else
        goto DeleteSnapshotTSM;
    end if;
    
    \* Acquire the lock on any affected keys. Uses the epoch system.    
    DeleteFinalize:
    await ~(delete.key \in state.locked_keys);
    Update(state, [ locked_keys |-> state.locked_keys \union {delete.key} ]);
    
    \* Inform any TSM writers that they need to apply this delete before the final rename.
    DeleteInformWriters:
    await ~state.compaction_start_lock;
    Update(state, [
        snapshot_deletes   |-> state.snapshot_deletes   \union IF state.snapshotting THEN {delete} ELSE {},
        compaction_deletes |-> state.compaction_deletes \union IF state.compacting   THEN {delete} ELSE {},
        compaction_start_lock |-> TRUE
    ]);

    \* Atomically clean out the cache and update the set of keys  
    DeleteCache:
    Update(_local_dead_keys, [
        cache |-> Keys(Cache) \ Keys(DeleteFilter(delete, Cache)),
        tsm   |-> _local_dead_keys.tsm \ Keys(DeleteFilter(delete, Cache))
    ]); 
    Update(cache, [
        existing |-> DeleteFilter(delete, cache.existing),
        incoming |-> DeleteFilter(delete, cache.incoming)
    ]);
    Update(state, [ compaction_start_lock |-> FALSE ]);
    
    \* Check the cache keys against TSM files on disk.
    DeleteCacheCheck:
    Update(_local_dead_keys, [
        cache |-> _local_dead_keys.cache \ Keys(KeyMatch(delete, TSMData(tsm_files)))
    ]);
    
    \* Update the index.
    DeleteIndex:
    index := (index \ _local_dead_keys.tsm) \ _local_dead_keys.cache;
    
    \* Release the lock.
    DeleteRelease:
    Update(state, [
        locked_keys     |-> state.locked_keys \ {delete.key},
        tsm_locked      |-> FALSE,
        delete_finished |-> TRUE
    ]);
    
    \* Reset the local variables to reduce the number of distinct states
    _local_tsm_files := <<>>;
    _local_dead_keys := {};
end process;

\*
\* cache: just needs acquire tsm lock and apply deletes during snapshots.
\*

fair process cache = "cache"
variable
    _local_delete   = [key |-> "", ts |-> 0];
    _local_snapshot = {},
begin
    CacheWaiting:
    await CanSnapshot \/ WriterDone;
        
    if CanSnapshot then
        CacheBegin:
        await ~state.compaction_start_lock;
        Update(state, [snapshotting  |-> TRUE]);
        Update(cache, [
            snapshot |-> Cache,
            existing |-> Cache,
            incoming |-> {}
        ]);
        _local_snapshot := cache.snapshot;
        
        CacheLockTSM:
        await ~state.tsm_locked;
        Update(state, [tsm_locked |-> TRUE]);
        
        CacheFinish:
        ApplyDeletes(state.snapshot_deletes, _local_snapshot);
        tsm_files := Append(tsm_files, _local_snapshot);

        Update(cache, [
            existing |-> cache.incoming,
            incoming |-> {},
            snapshot |-> {}
        ]);
        Update(state, [
            snapshotting |-> FALSE,
            tsm_locked   |-> FALSE
        ]);
        
        \* Reset the local variables to reduce the number of distinct states
        _local_delete := [key |-> "", ts |-> 0];
        _local_snapshot := {};
    end if;
end process;

\*
\* compaction: just needs to acquire tsm lock and apply deletes during compactions.
\*

fair process compaction = "compaction"
variable
    _local_delete = [key |-> "", ts |-> 0],
    _local_files  = <<>>,
    _local_data   = {},
begin
    CompactionWaiting:
    await CanCompact \/ CacheDone;
        
    if CanCompact then
        CompactionBegin:
        await ~state.compaction_start_lock;
        Update(state, [compacting |-> TRUE]);
        _local_files := tsm_files;
        _local_data := UNION Range(_local_files);

        CompactionLockTSM:
        await ~state.tsm_locked;
        Update(state, [tsm_locked |-> TRUE]);
        
        CompactionFinish:
        ApplyDeletes(state.compaction_deletes, _local_data);
        tsm_files :=
            LET Cleared == [n \in DOMAIN _local_files |-> {}]
                Merged  == Cleared @@ tsm_files
            IN  Append(Merged, _local_data);

        Update(state, [
            compacting |-> FALSE,
            tsm_locked |-> FALSE
        ]);
        
        \* Reset the local variables to reduce the number of distinct states
        _local_delete := [key |-> "", ts |-> 0];
        _local_files := <<>>;
        _local_data := {};
    end if;
end process;

end algorithm;
*)

zeebo avatar Jan 25 '19 02:01 zeebo

note that this does not include the time writing the TSM files, just the final rename step

Maybe we should list the required fsync ordering here, too.

Acquire a read lock on any keys being written. Locks are allowed to be more granular: you may lock an entire measurement or even a whole bucket.

A lock on writes to keys that may be removed from the index.

Where are the key-locks maintained? Is the entire key-set in memory? Are the active locks in memory? What happens if a write causes series creation? And does series key prefixing work with series creation (when writing a bucket for the first time, for example, what locks are required)?

If the set of TSM files that now exist is a subset of the TSM files that the deletes have been applied to, goto 5. Otherwise, go back to 1.

I don't think I quite understand this - when snapshotting (writing the level-0 TSM file), is the lock on creating new TSM files required to be held? If so, does that mean that cache can't be cleared until a delete completes?

rbetts avatar Jan 25 '19 15:01 rbetts

Maybe we should list the required fsync ordering here, too.

fsync ordering is typical. Fsync the file, rename, fsync the directory.

Where are the key-locks maintained? Is the entire key-set in memory? Are the active locks in memory? What happens if a write causes series creation? And does series key prefixing work with series creation (when writing a bucket for the first time, for example, what locks are required)?

The key locks will be maintained in memory. On the delete side, it would be for every key that is being removed from the index (so hopefully small, but could be large). This is why it is pointed out that the locks may be less (oops I wrote more at first) granular. So maybe we lock any measurement that has a series being deleted, or maybe any series that matches the measurement + the first two tags in lexicographic order.

On the create side, you would want it to be as granular as possible, and since we already have the full set of points in memory in order to write them, it shouldn't require too much extra memory.

I expect to implement this with some sort of a trie because that shares prefixes across keys.

I don't think I quite understand this - when snapshotting (writing the level-0 TSM file), is the lock on creating new TSM files required to be held? If so, does that mean that cache can't be cleared until a delete completes?

Indeed, the cache cannot be cleared while the delete is in that last critical step of cleaning up the index. It is imperative that this completes as fast as possible. It should be only as expensive as appending the set of deleted keys to some log files, though, so I don't expect that much of a problem. Compactions and cache snapshots can proceed during the phase where it deletes data out of the existing TSM files, which scales with the amount of stored data.

This is already the case with deletes in 1.x and current 2.0 for everything but snapshots. In fact, my understanding is that a delete will actively cancel any ongoing compactions and block them until it is finished.

  • Index compactions are disabled and we wait (would still have to do this, but can do it before we acquire the TSM lock): https://github.com/influxdata/influxdb/blob/1.8/tsdb/engine/tsm1/engine.go#L1372-L1384
  • Level compactions and series file compactions are disabled and canceled: https://github.com/influxdata/influxdb/blob/1.8/tsdb/engine/tsm1/engine.go#L1432-L1447
  • The previous lines note that snapshot compactions are NOT disabled, but it also later notes that it is inherently racy: https://github.com/influxdata/influxdb/blob/1.8/tsdb/engine/tsm1/engine.go#L1571-L1578

This conversation made me realize that instead of a lock, we can allow staging temporary TSM files, and delay adding them fully. In other words, instead of waiting for the TSM lock, the process just schedules the delete + rename of the TSM file to happen upon unlock, and returns. This would allow snapshots (and some limited compactions) to proceed, but we would not be able to truncate the WAL. I don't think we would have been able able to truncate the WAL anyway, since presumably the running delete is part of it.

Conceptually, this doesn't change the model at all, since it's equivalent to a single snapshot just written out in multiple TSM files.

I don't think this is necessary, but it's good that it should be possible.

zeebo avatar Jan 25 '19 17:01 zeebo

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

stale[bot] avatar Jul 23 '19 09:07 stale[bot]

This issue has been automatically closed because it has not had recent activity. Please reopen if this issue is still important to you. Thank you for your contributions.

stale[bot] avatar Jul 30 '19 09:07 stale[bot]

To various degrees, aspects of the report here still apply.

philjb avatar Oct 01 '25 21:10 philjb