scylla-cdc-rust
scylla-cdc-rust copied to clipboard
Support general stream changes for tablets
Initial support of CDC with tablets was introduced in https://github.com/scylladb/scylla-cdc-rust/pull/136. The implementation assumes that when CDC streams of a table are changed, then all the streams are replaced at once in a single timestamp. This is convenient because it works similarly to the concept of CDC generations for vnodes.
This is indeed the way it works currently for tablets - on tablet split or tablet merge, all tablets are split or merged at once, and new cdc streams for each tablet are created in the same operation.
However, the cdc streams metadata in system.cdc_timestamps and system.cdc_streams supports more general stream changes - closing only some of the streams and replacing them with new opened streams, and it may be used in the future in scylla core to change streams more gradually instead of all at once.
the current implementation should still work with general stream changes, but it may not be very efficient, because when there's a new cdc timestamp it always assumes that all streams are replaced, it will wait for all existing tasks to complete and create new tasks for all active streams in the new timestamp. a more efficient implementation would be to stop only the tasks for the closed streams, and create new tasks for the opened streams, and let all other tasks for unchanged streams continue uninterrupted.
Information about CDC streams is retrieved from system.cdc_timestamps and system.cdc_streams as follows.
We first query system.cdc_timestamps to learn about new cdc timestamps. When there is some change to CDC streams, a new timestamp will be created and appear in this table. Then we query system.cdc_streams for this timestamp to get information about the stream changes and the stream IDs.
example:
find the first timestamp and streams:
cqlsh> SELECT * FROM system.cdc_timestamps WHERE keyspace_name='ks' AND table_name='t' ORDER BY timestamp ASC limit 1;
keyspace_name | table_name | timestamp
---------------+------------+---------------------------------
ks | t | 2025-10-13 10:17:35.785000+0000
cqlsh> SELECT * FROM system.cdc_streams WHERE keyspace_name='ks' AND table_name='t' AND timestamp='2025-10-13 10:17:35.785+0000';
keyspace_name | table_name | timestamp | stream_state | stream_id
---------------+------------+---------------------------------+--------------+------------------------------------
ks | t | 2025-10-13 10:17:35.785000+0000 | 0 | 0xffffffffffffffff1af242b1c0000001
ks | t | 2025-10-13 10:17:35.785000+0000 | 0 | 0x7fffffffffffffffad4dd820ec000001
ks | t | 2025-10-13 10:17:35.785000+0000 | 2 | 0xffffffffffffffff1af242b1c0000001
ks | t | 2025-10-13 10:17:35.785000+0000 | 2 | 0x7fffffffffffffffad4dd820ec000001
find the next timestamp and streams, with the changes from the previous timestamp:
cqlsh> SELECT * FROM system.cdc_timestamps WHERE keyspace_name='ks' AND table_name='t' AND timestamp > '2025-10-13 10:17:35.785+0000' ORDER BY timestamp ASC limit 1;
keyspace_name | table_name | timestamp
---------------+------------+---------------------------------
ks | t | 2025-10-13 10:21:27.290000+0000
cqlsh> SELECT * FROM system.cdc_streams WHERE keyspace_name='ks' AND table_name='t' AND timestamp='2025-10-13 10:21:27.290+0000';
keyspace_name | table_name | timestamp | stream_state | stream_id
---------------+------------+---------------------------------+--------------+------------------------------------
ks | t | 2025-10-13 10:21:27.290000+0000 | 0 | 0xbfffffffffffffffb2cd10d45c000001
ks | t | 2025-10-13 10:21:27.290000+0000 | 0 | 0xffffffffffffffffe689d08904000001
ks | t | 2025-10-13 10:21:27.290000+0000 | 0 | 0x3fffffffffffffffae99839978000001
ks | t | 2025-10-13 10:21:27.290000+0000 | 0 | 0x7fffffffffffffffd59f710d68000001
ks | t | 2025-10-13 10:21:27.290000+0000 | 1 | 0xffffffffffffffff1af242b1c0000001
ks | t | 2025-10-13 10:21:27.290000+0000 | 1 | 0x7fffffffffffffffad4dd820ec000001
ks | t | 2025-10-13 10:21:27.290000+0000 | 2 | 0xbfffffffffffffffb2cd10d45c000001
ks | t | 2025-10-13 10:21:27.290000+0000 | 2 | 0xffffffffffffffffe689d08904000001
ks | t | 2025-10-13 10:21:27.290000+0000 | 2 | 0x3fffffffffffffffae99839978000001
ks | t | 2025-10-13 10:21:27.290000+0000 | 2 | 0x7fffffffffffffffd59f710d68000001
the values in stream_state are defined in the StreamState enum:
enum StreamState {
Current = 0,
Closed = 1,
Opened = 2,
}
system.cdc_streams shows for each timestamp the streams that are active in this timestamp (Current), and it also shows the streams from the previous timestamp that are not active anymore (Closed), and the new streams that are introduced in this timestamp (Opened).
the current implementation simply fetches for each timestamp the streams with stream_state = StreamState::Current. instead, we can query for each timestamp for the closed and opened streams to find the changed streams, and update only the relevant tasks.