Flink: Incrementally rewrite data files in streaming.
Description
This PR is base on the approach of @rdblue comment and trying to incrementally rewrite committed data files.
Implementation
In this PR, the parallel IcebergStreamRewriter and single-parallelism IcebergRewriteFilesCommitter will append to IcebergFilesCommitter.
+-> IcebergStreamRewriter -+
IcebergFilesCommitter -Forward-> IcebergRewriteTaskEmitter -Rebalance-+-> IcebergStreamRewriter -+-Rebalance-> IcebergRewriteFilesCommitter
+-> IcebergStreamRewriter -+
-
At the beginning, the committed data files and related delete files will be pack as
CommitResultwhich has sequence number and snapshot id and send toIcebergRewriteTaskEmitter. -
The
IcebergRewriteTaskEmitterreceive commit results and group data files and delete files by partition. And then each partitioned file groups will be packaged into one or moreRewriteFileGroupaccording theStreamingBinPackStrategy. Once aRewriteFileGroupreach a rewrite condition(file number / file size), it will be split and combine to one or moreCombinedScanTaskand then emitted asRewriteTask. -
IcebergStreamRewriterwill simply rewrite each receivedRewriteTaskand emit the added data files and the rewritten data files asRewriteResult. -
IcebergRewriteFilesCommitterwill collect allRewriteResultand group rewrite results by starting snapshot id and partition. the grouped rewrite results will be committed in next checkpoint.
Note
IcebergRewriteTaskEmitterandIcebergRewriteFilesCommitterparallelism will be always 1, andIcebergStreamRewriterparallelism is configurable.IcebergRewriteTaskEmitterwill compare the received snapshot id of commit result with the last received snapshot id. If the last received snapshot id is not the parent of the received snapshot id, that is mean there are concurrent writers commit other snapshot to this table. And theIcebergRewriteTaskEmitterwill collect all delete files between the last received snapshot and received snapshot toRewriteFileGroupto avoid missing any equality delete files when doing compaction.- When restore job from checkpoint or savepoint,
IcebergRewriteTaskEmitterwill collect all data files and delete files which are committed by the restored flink job and collect all eq-delete files which are committed by other writer from the last received snapshot to the last committed snapshot of the restored flink job. - Any exception happen when rewriting data files in
IcebergStreamRewriterwill cause job fail. IcebergRewriteFilesCommitterwill using starting snapshot id and sequence number when commit rewrite results in next checkpoint.
Configuration
flink.rewrite.enable: Configuring whether to enable the rewrite operator, default is false.flink.rewrite.parallelism: Configuring the rewrite operator parallel number, default is same as job parallelism.flink.rewrite.target-file-size-bytes: The output file size attempt to generate when rewriting files. the rewrite will be triggered when the rewrite files size ofRewriteFileGroupreach this value. Default is same aswrite.delete.target-file-size-bytes(512MB).flink.rewrite.min-file-size-bytes: Files smaller than this value will be considered for rewriting.flink.rewrite.max-file-size-bytes: Files larger than this value will be considered for rewriting.flink.rewrite.min-group-files: The minimum number of files that need to be in a file group for it to be considered for rewriting. Defaults to 2 files, which is mean at less 2 files in a file group will be considered for rewriting.flink.rewrite.max-group-files: The maximum number of files that allow to be in a file group for it to be considered for rewriting. The rewrite will be triggered when the total files count ofRewriteFileGroupreach this value. Default isInteger.MAX_VALUE.flink.rewrite.nums-of-commit-after-append:The maximum number of commits will be wait for a file group. If no more file append to this file group after this number of commits, this file group will be rewritten regardless of whether the total size of that group reaches theflink.rewrite.target-file-size-bytes. Defaults toInteger.MAX_VALUE, which means this feature is not enabled by default.
I just finish the streaming rewrite function, and I will complete the unittest in these few days. Maybe you can take a quick look and leave some comments, I will keep improve this PR in this week. @rdblue @openinx @stevenzwu @jackye1995 @kbendick :)
I this PR is ok, could you take a look of this? 😄 @rdblue @openinx @stevenzwu @jackye1995 @kbendick
@Reo-LEI would you mind to fix this conflicts ? I will plan to go through the whole PR, thanks for the contribution !
had a lot of thinking around this PR, I decide to write down in a doc, @openinx @rdblue @stevenzwu please let me know what you think!
https://docs.google.com/document/d/18N-xwZasXLNEl2407xT1oD08Mv9Kd3p0xMX7ZJFyV20/edit?usp=sharing
@openinx Thanks for your review! I have been resolve the conflict, please go ahead. 😄
I‘m very grateful to @jackye1995 for his excellent work. I think Jack summarized this PR from the background, implementation, benefits very well in this document. And Jack proposed two approach in the document to improve the defects of this PR in CDC case. I would like to make some additions and summaries to this document, and continue to discuss the approach about how to deal with the equality deletes compaction in CDC case at here. @rdblue @openinx @stevenzwu @kbendick
At first, I want to discuss the effectiveness of current implementation of streaming rewrite(apply delta deletes to delta data). We can measure this effectiveness from the type of data stream and whether the table partition type. For the data stream type, we can be divided into append stream and update stream(include upsert and retract/cdc stream). For the table partition type, we can devided into partitioned and unpartitioned, and partitioned can be further divided into partitions that contain time and partitions that do not contain time. For effectiveness, we use o to indicate the streaming rewrite is effective that is mean the streaming rewrite can do the same and good as the RewriteDataFiles action and user not need to run RewriteDataFiles in addition, and we use x to indicate the streaming rewrite is not very effective that is mean the streaming rewrite can not do well as the RewriteDataFiles action and user still need to run RewriteDataFiles in addition. Finally, we can get the matrix as follow:
| unpartitioned | partitioned without time | partitioned with time | |
|---|---|---|---|
| append stream | o | o | o |
| upsert stream | x | x | o |
| retract stream | x | x | o |
For append stream, streaming rewrite can work well in all partition type because there are no delete files and we only need to bin pack data files. For update stream(upsert and retract stream), streaming rewrite can work well with table of partitioned with time. Because the result will become certain over time, such as we have an day and hour partition table and T partition result will be certain in T+1 moment, so we can rewrite T partition after T. However, streaming rewrite can not work well with table of partitioned without time, as Jack mentioned in the doc, the high seqNum equality delete file still in storage and when we read the table, they will still apply to the low seqNum data files even if this data file has been rewritten before. So, I agree with Jack that we should first find a way to deal with the remaining equality deletes.
And then, I want to discuss the approach about how to deal with the remaining equality deletes. Jack list two approach in the doc. The approach A is gathering the equality delete files and then perform a table scan to rewrite the affected data files. And the approach B is to convert the equality deletes to position deletes and then perform a table scan to apply the position deletes and rewrite the affected data files. Both approach require extra table scan and data rewrite, I think these approach is work, but will be complicated and we need to reconsider the cost and benefit.
Finally, I think we can have approach C that we can simply rewrite the delta equality delete files to the new one and replace them by rewriteFiles . As I said above, currently impelemention of streaming rewrite can not work well with table of partitioned without time because the high seqNum equality delete files(the delta equality delete files) still in storage, and we know the streaming rewrite can apply all deletes to data files. Therefore, if we can rewrite these delta delete files to a new one and bringing them from older seqNum to a high seqNum(the latest commit seqNum), and then remove these delete files from latest snapthot, we can keep the number of equaliy delete fies at a low and same as the number of data files. In this approach, we no need extra table scan and data rewrite, and we remove unnecessary equality delete files and avoid apply them to the rewritten data files.
@jackye1995, thanks for taking the time to write up your thoughts. That doc is really helpful.
I had previously been thinking about this mostly in terms of an append stream and v1 tables. Like @Reo-LEI notes, inline compaction for append streams is safe (for v1 tables) and is a good idea if you want frequent checkpoints but don't want a ton of small files. For v1 tables, we know that there won't be equality or position delete files, so we can easily compact. For v2 tables, append streams are pretty much like CDC streams because we may have concurrent writers adding equality or position delete files.
That brings us back to Jack's points about adapting this idea to CDC streams. First, equality deletes create sequence number boundaries that make it difficult to compact. But I think there are still some use cases where this is valuable even if we only plan on compacting within a sequence number. Write tasks necessarily align with partitions, so compacting across tasks may still be valuable. For example, CDC writes to an unpartitioned table from multiple tasks will create a data file per task (per checkpoint) that is probably not full size.
While there may be multiple files per partition, I think that the larger use case is compacting across checkpoints and that will require addressing compaction across sequence numbers. Let's assume that we have a fairly regular CDC stream for a bucketed table so that the commit for each bucket has one equality delete and one data file per checkpoint, and optionally a position delete file for the data file. That's the worst case that Jack describes, where there is nothing to compact within each sequence number except position deletes. Here are the files committed by sequence number for just one bucket (bucket_id = 0):
| Type | Seq 107 | Seq 106 | Seq 105 | Seq 104 | Seq 103 |
|---|---|---|---|---|---|
| data | data-107.parquet | data-106.parquet | data-105.parquet | data-104.parquet | data-103.parquet |
| eq-deletes | eq-107.parquet | eq-106.parquet | eq-105.parquet | eq-104.parquet | eq-103.parquet |
| pos-deletes | pos-106.parquet | pos-103.parquet |
I think that we actually can do some compaction in this situation. Say we want to compact data-103.parquet through data-107.parquet. Then we need to apply all newer equality deletes to each data file:
| File | Delete files to apply |
|---|---|
| data-103.parquet | eq-{104,105,106,107}.parquet, pos-103.parquet |
| data-104.parquet | eq-{105,106,107}.parquet |
| data-105.parquet | eq-{106,107}.parquet |
| data-106.parquet | eq-107.parquet, pos-106.parquet |
| data-107.parquet |
The position delete files can be removed because they only reference data-103.parquet and data-106.parquet. The equality deletes must remain in the table in case they deleted data in other files. The new compacted data file should be data-107-compacted.parquet and should be committed at sequence number 107 so that future equality deletes are still applied correctly.
Another thing to keep in mind is that we may have equality and position deletes coming from concurrent writers. But in this case all we would need to do is update the task emitting committed data files to emit all delete files, even those from concurrent writers. For example, maybe sequence number 5 was written by a concurrent commit. Then the delete files for that commit, eq-105.parquet, should be used. But the data file should not be part of the compaction (it would be compacted by a parallel Flink job).
I think that this compaction strategy would address Jack's concern and is a safe way to compact across sequence numbers with equality deletes. The key idea is that we leave the equality delete files in the table so we don't have to worry about existing data files that aren't part of the stream we are compacting.
I think that what I'm proposing is similar to @Reo-LEI's comment, except that it limits the scope of the compaction differently. Using time-based partitions would make us reasonably confident that the compaction is targeted, but the drawback is that we would still need to plan a scan, and it may still be impacted by concurrent deletes. But my strategy of keeping the compaction scoped to just files written by one Flink job and incrementally including delete files is a clean way to handle the issues and ensure that compaction won't conflict with one another.
The key idea is that we leave the equality delete files in the table so we don't have to worry about existing data files that aren't part of the stream we are compacting.
@rdblue If there is too much equality delete files will cause memory pressure when read table.
I finish the reactor of this PR according the comment of @rdblue and have been updated the description of this PR.
Briefly explain the change, I add the IcebergRewriteTaskEmitter to collect all data files and delete files which are committed by the flink job and collect all eq-delete files which are committed by other writers. Emit CombinedScanTask to IcebergStreamRewriter to make same partition rewrite file group can be rewritten in paralle. Group rewrite results by starting snapshot id and partition and commit them in batch.
I test this in our prod env and has been running for a while, and that is work for v1 and v2 table. And for the time-based partitioned v1 table, we can use streaming rewrite to replace the batct rewrite action.
@rdblue @jackye1995 @openinx @stevenzwu @kbendick Could you please take another look when you free?
a quick question, can this pr be used in flink1.13.1?
a quick question, can this pr be used in flink1.13.1?
I have been port this featuer to flink 1.13 in branch ‘flink-streaming-rewrite-for-flink0.13’ before, but there have some conflicts need to be resloved, maybe you can use that. @Initial-neko
Hello, may I ask if the rewrite is asynchronous or synchronous
Hello, may I ask if the rewrite is asynchronous or synchronous
The rewrite is performed asynchronously. @lurnagao
Hello, may I ask if the rewrite is asynchronous or synchronous
The rewrite is performed asynchronously. @lurnagao
Thank you so much! Do you mean to commit twice, the first time to commit the original data, and the second time to commit the compacted data.When the first commit is completed, the data will be visible?
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions.
This pull request has been closed due to lack of activity. This is not a judgement on the merit of the PR in any way. It is just a way of keeping the PR queue manageable. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time.