Support (incremental) changelog scan for Change Data Capture use-cases
Is your feature request related to a problem or challenge?
Currently iceberg-rust doesn't provide a way to see changes between two snapshots. In Spark, through Iceberg Java implementation, this is done using create_changelog_view. This is very useful for doing change data capture on top of Iceberg tables.
Describe the solution you'd like
The output for Spark's create_changelog_view, in default mode, is something like this:
where each row shows its user-defined columns, with addition of 3 metadata columns (_change_type, _change_ordinal, _commit_snapshot_id).
The way Java code does it is incremental, meaning only the data between the optional timestamps (or commit IDs) is processed. Here are some references:
openChangelogScanTask in https://github.com/apache/iceberg/blob/efbfb7ef9addeb33e72208c927936e50b92d3357/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java
doPlanFiles in https://github.com/apache/iceberg/blob/6ec3de390d3fa6e797c6975b1eaaea41719db0fe/core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java
BaseAddedRowsScanTask and BaseDeletedDataFileScanTask. BaseDeletedRowsScanTask is unused, which means that Spark doesn't support row-level deletes, only copy-on-write kind of deletes, for the changelog scan. But it would be good if Rust actually supported that as well, I see no particular reason why this wasn't supported in Spark.
The create_changelog_view has several options, and perhaps we don't have to support them all in Rust immediately, but over time.
Willingness to contribute
I would be willing to contribute to this feature with guidance from the Iceberg Rust community
I realize this may not be possible to support by the iceberg-rust alone, without some query processing engine. Examples that I gave above use Spark to iterate through rows (and for example deduplicate them if net_changes is enabled). However, it'd be useful if we could open up extension points so that a query processing engine together with this library can achieve what Spark does with the Java library.
For example, it'd be useful to make it possible to retrieve only data that was added or removed in between some snapshots.
One could modify plan_files slightly to allow filtering manifests whose added_snapshot_id is not in the desired snapshot IDs.
In our fork, we've figured out what shape this changelog could look like to satisfy our needs. Here's a rough outline.
We need to return two streams (with a possibility of combining them in a single stream). One stream is for appends (aka inserts), and the other one is for deletes. These streams represent changes between snapshot from_snapshot_id and to_snapshot_id, in the future let's call these snapshots snapshot from_snapshot_id=A and to_snapshot_id=B.
The insert stream would contain (_file, _pos, user_cols...) (where user_cols are values of the actual columns for particular row). Note that adding _file and _pos metadata columns is an orthogonal feature, tracked with https://github.com/apache/iceberg-rust/issues/1766 and https://github.com/apache/iceberg-rust/issues/1765.
The deletes stream should only contain (file_path, pos), i.e. entries that appear already in the positional delete files. However, this stream should also contain entries from deleted data files, as well as resolve and translate equality deletes into such format.
To produce inserts, we'd simply scan all the data files appearing between snapshots A and B, and apply positional and equality deletes, just like in regular FileScanTasks. Data files that are added >A, but deleted <=B, would be skipped.
To produce deletes, we'd only consider deletes that refer to data files <=A.
Regarding the mode, the changes would be almost net changes. The deviation from that can happen in deletes, e.g. if we have a deletion of data file, which already had positional deletes applied. It seems computationally expensive to detect this, and there's no need to deduplicate deletes, we can always do that by post-processing the stream.
Finally, I'm not sure what is the difference here between this and the incremental scan: https://github.com/apache/iceberg-rust/issues/1469. Initially that was only incremental append-only scan, but with the introduction of deletes, the difference between incremental scan and changelog scan is blurry to me.
We implemented the above in our fork: https://github.com/RelationalAI/iceberg-rust/pull/3. We realize that might not be the best implementation for being merged into the official repo, but given the uncertainty how it should look like, we avoided too much refactoring and minimized future merge conflicts in our approach. Look at it only as a reference of what was needed, not how the API should look like - the PR here should definitely look differently, making sure to reuse more common pieces. The hope is that we first align on how it should look like in this issue.