kuzu icon indicating copy to clipboard operation
kuzu copied to clipboard

Transactions Version 2

Open semihsalihoglu-uw opened this issue 1 year ago • 0 comments

High-level Goals

Our transaction support has several fundamental limitations: (i) we do not allow concurrent write transactions; and (ii) we immediately checkpoint after each write, which has a phase where we pause the entire system that we call "stop the world". This issue describes a proposal to improve our transaction support to solve the first issue and improve the second one by checkpointing less frequently.

We differentiate between large transactions, which perform large changes to the database, e.g., a bulk insert of 1 million records, vs small transactions, which perform small set of updates, e.g., a few insertions. We need a way to differentiate between these types of transactions and employ different techniques for both. In particular, for bulk updates, we should stop all other concurrent writes and perform the updates in a fashion similar to how initial loading of tables is done in COPY FROM statements. This current proposal is concerned with the execution of small transactions. I suggest that for now for simplicity we assume that any command issued on the system that uses COPY FROM statement is a bulk update and cannot be mixed with other statements in a transaction. In contrast, any transaction that issues statements that include CREATE, DELETE, DELETE DETACH, and SET statements (but not COPY FROM) are small transactions.

The core techniques in the proposal are the following:

  • An MVCC protocol for transaction isolation with Snapshot Isolation guarantee with the following features:
    • Transactions run single-threaded: This does not seem like a very important constraint but should simplify implementation.
    • Version Chains: Node and rel property values have version chains that are timestamped using timestamps of the transactions that have performed the updates. Version chains should ideally be ordered newest-to-oldest but because we have a LocalStore for RelTable and a HashIndex and because we may not be able to directly overwrite the previous value, we will have to make an adjustment to this, which I call "Oldest-Then-Newest-To-Second-Oldest" order, in which we do not overwrite the checkpointed stable version directly on the page. Oldest-Then-Newest-To-Second-Oldest chains ensure we do not overwrite pages in buffer manager except during checkpointing, which ensures we can keep our current checkpointing logic. We also maintain in column chunks necessary auxiliary data structures to facilitate fast scans.
    • Transaction Timestamps: Transactions get several timestamps to identify both the value version they can read and to mark the values they update with their own timestamps.
    • No W-W Conflicts to Ensure Snapshot Isolation: Ensuring snapshot isolation is simple. Any transaction Ti that writes to a value v ensures that no other transaction, committed or uncommitted, has written to v since Ti started.
    • Continuous Garbage Collection: Unused versions are garbage collected as part of every commit and checkpointing protocols.
  • Logging and Recovery
    • Distributed Logging Using Redo-only Local Record-level WAL: Each trx/thread has its own local "record-level WAL". Record-level WAL contains logical update records, such as (type: update/insert/delete, tableName, columnName, new-value) (we should not need prev-value). These WALs are always kept only in memory.
    • Redo-only Global Record-level WAL: A transaction T that commits appends its own local record-level WAL to a global record-level WAL and flushes the global record-level WAL to disk. This ensures durability so we can commit.
    • Periodic Checkpointing and Recovery Using a "Redo-only Page WAL": Checkpointing is triggered as the global record-level WAL reaches a size limit (e.g., 10MB) and/or a timeout (e.g., 10s). This is a multi-step phase that involves generating shadow page WAL records as we currently do.

Readings

The core ideas related to transaction isolation here are adaptations from this paper:

  • Fast Serializable Multi-Version Concurrency Control for Main-Memory Database Systems, SIGMOD 2015, Neumann et al: This paper describes Hyper's MVCC protocol. There is another paper that describes Umbra's MVCC protocol here (VLDB 2022) but it's less relevant. The Umbra paper solves the problem of what to do if dirty pages are evicted to disk arbitrarily. We will avoid this problem by not creating dirty pages. Otherwise the protocols are identical with Hyper's protocol. The Umbra paper also proposes a rather complicated mechanism to handle large transactions. We are not concerned with large transactions here.

There is no direct reading I can suggest for the checkpointing and recovery mechanism but the idea of distributed logging is inspired by the technique described here:

MVCC-based Transaction Isolation Protocol

Values stored in the database, specifically node properties and rels and rel properties have possibly multiple versions that are identified by versionIDs. The versionID of a value will be the timestamps/IDs (henceforth IDs for simplicity) of transactions that created the versions.

Transaction IDs and Active Transactions Table

Each transaction Ti is assigned three IDs:

  1. startID: Given to every transaction (both read and write) when they start from a "commitID sequence" that starts from 0 and goes till $2^{63}-1$. This sequence increases by 1 each time some trx commits. Therefore multiple concurrent read or write transactions can be given the same startTimestamp. The purpose of this timestamp is to identify the versions of the values that transactions should read.
  2. trxID: This is a timestamp given to write transactions when they start from a "trxID sequence" that starts from $2^{63}$ goes till $2^{64}$. These are used to mark value versions that are created by Ti before Ti has committed. The disjointness of trxIDs from the startIDs and commit IDs (see below) enables transactions to identify that a value version is generated by an uncommitted transaction.
  3. commitID: This is given to write transactions when they commit from the commitID sequence that was used to give the startID. trxID can be thought of as an "uncommittedID" given to write transactions and the commitID replaces the trxID.

Active Transactions Table: The system maintains the start and trxIDs of active, i.e., uncommitted, transactions at any point in time.

Oldest-Then-Newest-To-Second-Oldest Version Chains, Undo Buffers, and Active Transactions Table:

Creating new versions: When a trx Ti needs to update a value v, e.g., the age property of a User node table, it first acquires a lock that will block other readers and writers. I believe this lock should be the page lock, which we already use when accessing pages exclusively. We should be able to acquire these locks in shared mode, so multiple concurrent read transactions can read them, as well as exclusive write mode. Then using an auxiliary version chain data structure, it creates a new version for the value and marks its versionID with trxID. The new version is put in a local trx/thread-level undo buffer and should be linked with the other versions that have been created. We have a design choice to make here about whether the versions are linked in an oldest-to-newest or newest-to-oldest fashion. The Hyper paper does newest-to-oldest fashion probably because newest to oldest should lead to shorter version traversals assuming most transactions will want to read the newer versions, and newest-to-oldest overwrites the pages directly which probably makes checkpointing faster and is something their checkpointing protocol handles. I propose to avoid overwriting the values in pages directly except during checkpointing.

Write-Write Conflicts: We avoid any write-write conflict. So, a Ti with start time si aborts immediately if it notices that there is either: (i) an already uncommitted transaction that has created a version for v; or (ii) another transaction with commit ID > si has committed another version (this is also a write write conflict). Write-write conflicts are the only conflicts we need to detect to ensure snapshot isolation.

vanilla-newest-to-oldest-version-chains

Vanilla Newest-to-Oldest Version Chains Option Above is the version chains one would obtain in a vanilla newest-to-oldest chain. In the example, there are 5 write transactions TW0 to TW4 and a read transaction TR1. The startID, trxID, commitIDs of write transactions are shown either on the right next to their undo buffers or in the Active Transactions Table (which is shown right before TW0 aborts, so TW0 still appears as active). Here is the summary of the example. The system has checkpointed all committed transactions up to ID 1007. This is shown as the "last checkpointID" field above the column. The overview of the actions of the transactions are as follows. The timeline of events are shown in the figure below.

timeline-of-events

  • TW0: starts at startID 1007 with trxID $2^{63}+4$. Eventually it tries to edit the age property of node with offset 4 to 250 and aborts due to a write-write conflict (TW_3 has already written to this value).
  • TW1: starts at startID 1007 with trxID $2^{63}+5$. Changes the age of node 3 from 40 to 140 and commits with commitID 1008.
  • TW2: starts at startID 1008 with trxID $2^{63}+6$. Changes the age of node 3 from 140 to 240 and commits with commitID 1009.
  • TW3: starts at startID 1008 with trxID $2^{63}+7$. Tries to changes the age of node 3's latest value but aborts because the value 240 is written by a transaction with commitID 1009, which is > 1008.
  • TW4: starts at startID 1009 with trxID $2^{63}+8$. Changes the age of node 3 from 240 to 340 (suppose this happens after TW3 tried to write to the same value) and node 4 from 50 to 150. Has not yet committed.
  • TR1: starts with startID 1009 with trxID $2^{63}+9$. Has not yet committed.

In the figure versionIDs of the values are shown as the first field of the tuples showing the versions. For values that do not have any version chains, their versionIDs are the last checkpointID of the node group. Finally, in vanilla newest-to-oldest chains, the newest version is written directly to the page and its version needs to be recorded somewhere. In the figure, this is done in an auxiliary data structure whose first column is the lastVersionId (called versionVector in the Hyper paper).

The problem with the vanilla design for us is that directly overwriting the stable version may not be possible if the new value triggers recompression (e.g., because the column was bitpacked and the number of bits used cannot accommodate the new value). In addition there may be complications due to writing dirty pages to disk outside the checkpointing protocol. Specifically, writing dirty pages of uncommitted transactions requires undo operations during recovery and possibly aborts. I find the redo-only wal implementation simpler to implement. Therefore we should slightly change this design and always store the stable, i.e., last checkpointed, versions in the actual pages and then start the version chains with the newest-to-oldest as shown in the figure below. I am referring to this as Oldest-Then-Newest-To-Second-Oldest chain. Note that the stable version is the oldest version, and forms the beginning of the chain. The rest of the records are ordered in newest-to-oldest order. Therefore the tail of the chain is the second oldest version.

stable-then-newest-to-oldest-version-chains

Reading the correct version of value v: Consider a trx Ti with startID sID and trxID tID. Ti first checks if v has a version chain. If not it reads the stable/oldest version directly. Otherwise it reads the version chain to find the version v' that satisfies one of two conditions: (1) either v'.versionID = Ti.trxID, i.e., Ti has written this version; or (2) v'.versionID $\le$ Ti.startID. If a v' that satisfies one of these properties is found, v' is the version Ti reads. Otherwise, Ti reads the stable version.

Here are two examples from our running example. If TR1, with start ID 1009, reads node 3's age value, it should read 240, because that version has versionID of 1009. Instead TR1 reads node 4's age value, it will start with (50, 1007), which is the stable version, then move to ($2^{63} + 8$, 150), which is larger than 1009. However this is not TR1's trxID (which is $2^{63} + 9$), so it reads stable version 50. If on the other hand TW4, with startID 1009 and trxID $2^{63} + 8$ reads node 4's value, it will read 150 because TW4 is the trx that has written this value.

Ensuring Fast Scans: We need a mechanism so that scans of large parts of the data that has no updates can be scanned directly from the pages. We need to pick some granularity of "partitions" for which we keep metadata about whether the partition has any version chains or not. The figure logically partitions a NodeGroup's column into partitions, part0 and part1. This is not a suggestion for how to implement this mechanism but in the figure we enable fast scans by maintaining a "Version Chains" map per partition that maps node offsets that have versions (or edge IDs or local edge offsets for relationship properties) to the heads of their version chains. In presence of this map, we can do fast scans if the version chain map of the partition is empty (shown as null in the figure). In absence of a version chain we know that every transaction, regardless of their startID or trxID, has to read the stable version of all values in the partition, so they can directly scan.

This idea can be generalized. For example suppose in our running example a transaction with commitID 1010 writes to node 0 and creates an entry in the version chain map of part0. So no transaction can do fast scans there because every transaction first needs to confirm whether the new version of node 0 is relevant or not. Instead we can keep an additional field per partition called "lowestCommitID", which could be set to the lowest commitID in any of the version chains in the partition. In our continued example, we would set part0's lowestCommitId to 1010 (and part1's lowestCommitID to 1008). This now allows a read-only transaction with startID < lowestCommitID to benefit from fast scans. For example TR1 in our example has startID 1009 and can directly scan the part0's values.

Distributed Logging: There are 3 separate WALs in the system:

  • Per-trx (local) record-level WAL (L-Record-WAL)
  • Global record-level WAL (G-Record-WAL)
  • Global page-level WAL (G-Page-WAL) We discuss L-Record-WAL here and the latter two will be explained momentarily. L-Record-WAL are maintained per-thread/trx and only kept in memory. Each trx Ti logs the writes that Ti performs logically in its L-Record-WAL, e.g., as (update, nodeOff:3, age, 150), (insert, (age: 150, name: Alice)),(delete, nodeOff:4). This type of mechanism currently does not exist in the system. Instead our logs consist of physical log records that indicate which page has changed and where the shadow/write version of the page is. I propose we keep the latter mechanism as well to simplify our checkpointing logic.

Commit Protocol: The commit protocol involves Ti entering a critical section and doing the following steps:

  1. Obtain a commitID from the commitID sequence.
  2. Add a COMMIT record to Ti's L-Record-WAL and flushing the log records in L-Record-WAL to the G-Record-WAL. Note that the order of the logs in G-Record-WAL is the serialization order though we only guarantee snapshot isolation. That is, the serial replay of the transactions in the serialization order in the G-RL-WAL from the previous checkpoint may not be equivalent to the state of the database if these transactions were indeed executed serially.
  3. Replace the versionIDs of every version Ti created, which were marked with Ti.trxID, with Ti.commitID.
  4. Garbage collect any versions that may not be needed (momentarily described in more detail below).

Abort Protocol: Abort protocol simply discards all versions Ti generated and Ti's undo buffer.

Garbage Collecting: We garbage collection after every commit as well as checkpointing: (i) Commits: During commits, inside a critical section, transactions can traverse the version chains they have linked to from their undo buffers and given the earliest start ID of any active transaction in the system, can gc unused versions except the stable versions, which we keep on the pages for simplicity. Suppose the latest startID in the system is t_{ls}. t_{ls} does not include the start time of Ti, which has committed, so we consider it not active for this purpose. Then, a committing Ti goes through the versions in its undo buffer and for each version v, it goes to the oldest version (grabbing any necessary locks on the page v is part of), and then removes any version older than t_{ls}. If this removal makes any undo buffers (of possibly other transactions) empty, Ti also removes these undo buffers. For example, consider in our running example after TW0 aborts, TW4 commits with commitID 2010. Then the only active transaction in the system will be TR1 with startID 1009. Therefore the version for node 3's value (1008, 140) created by TW1 is no longer needed. Since TW4 also wrote to the same value, it garbage collects this version. Removing this version also makes TW1's undo buffer empty, so TW4 also garbage collects TW1's undo buffer. (ii) Checkpointing: After checkpointing every undo buffer and value version, except the stable ones directly on the pages are garbage collected.

Checkpointing and Recovery

We checkpoint in three cases:

  1. During starting the database.
  2. When shutting down the database.
  3. Continuously in the earlier of two events: (i) the G-RL-WAL gets large enough (e.g., 10MB); or (ii) after a fixed amount of time since last checkpoint has elapsed, e.g., 10s.

Note that G-RL-WAL only contains log records from committed transactions in sequence. If a COMMIT record is missing, then we know that there was a crash in the middle of flushing the records of a transaction, so the transaction is not committed, so we only need to reply from the last CHECKPOINT record until the last COMMIT record. The checkpointing protocol is as follows:

  1. Stop write trxs.
  2. Starting from the last CHECKPOINT record in G-RL-WAL or beginning if none exists, start replaying the log records and performing the updates, now changing the actual pages by computing new shadow pages, and logging these shadow pages to G-PL-WAL (the global page-level WAL). That is, at this point, we repeat our current mechanism for achieving atomicity and durability and start creating a new set of pagel-level WALs that log which pages changed where the shadow pages reside (which we currently call in the system the WAL-version of the page). Note that if the checkpoint is being taken during normal replay, we can checkpoint as follows: instead of replaying log records, we go through the version chains up to the last commitID we are checkpointing, and then use those. I am not sure if this could have any performance benefits but this is worth keeping in mind. For now, since we need the mechanism to replay from WAL log records anyways, I propose not to implement this. Once all the replays are done, add a SHADOW-PAGES-FINISHED record to G-PL-WAL (logging the commitID of the last transactions that has been replayed), flush the G-PL-WAL and all shadow pages to disk.
  3. Stop read transactions.
  4. Replay G-PL-WAL and if any of the updated pages were in memory, "checkpoint the updated pages in memory".
  5. If checkpointing during normal execution, garbage all version chains, auxiliary data structures that we maintain for fast scans.
  6. Remove G-PL-WAL and truncate G-RL-WAL.

Note that because we stop write transactions, there cannot be any value versions after the checkpoint. That is we will checkpoint the updates of every transaction that has committed until the time of the checkpoint's end (and we are guaranteed to now have any active write transactions). Therefore after the checkpoint, irrespective of whether the checkpoint happened during normal execution or during recovery, the system state looks as if the database has recently been loaded.

Crash during recovery: In recovery mode, we should first check G-PL-WAL and see if it contains the SHADOW-PAGES-FINISHED record. If so, we skip Steps 1-3 of checkpointing start with Step 4 or replaying G-PL-WAL. During development I would double check by reading G-RL-WAL and ensuring that the commitID of the transaction replayed are equivalent in G-RL-WAL and G-PL-WAL. Otherwise, we can remove G-PL-WAL and start with step 1.

Serializability

We should maintain Serializability level and have clauses for users to explicitly choose this level in Cypher. The implementation of this should be managed in the transaction manager, which should ensure that a new transaction with Serializability first is put on a queue until all other write transactions leave the system. Then it starts alone and stops other write transactions from starting.

Sequence of PRs to Implement

The changes are a bit large to break into individual PRs without starting the implementation but a rough natural order of progress seems to be:

  1. In-memory Version Chains without record-level logging: timestamp logic, auxiliary data structures for fast scans, undo buffers, commit and abort protocols. All of these can first be implemented without any logging (so commits are not really commits).
  2. Record-level logging: Implement the infrastructure for record-level logging (what the log records look like, how to log variable-length data structures etc).
  3. Checkpointing and recovery: Implement the checkpointing and recovery algorithms.

It is of course more advanced than these because implementing these will likely require changes to many components, e.g., the LocalTable or BufferManager.

Further Considerations

I think we should discuss if we want Catalog, deleted IDs, statistics to all be tables in the system (node tables specifically). If we model all operations on these objects as scans of tables, since we have a way to transactionally maintain node table records, we could get a homogenous way to transactionally update these objects and the actual data. Currently we employ a different mechanism to transactionally update these objects, specifically these objects have two versions: a Read and Write version, used respectively by read transactions and the write transaction.

semihsalihoglu-uw avatar Nov 30 '23 14:11 semihsalihoglu-uw