iceberg
iceberg copied to clipboard
Flink: Made IcebergFilesCommitter work with single phase commit
How would the single phase commit solution support the exact one semantics for the Iceberg Sink?
@pvary we felt that even single phase commit should not have much of an issue if users want to use that
Why?
- Current implementation is 2 step so expectation with users is that once checkpoint's notifyCheckpointComplete is called then only data will be discoverable in Iceberg tables, but we felt that users should not have much of an issue if we do commit in snapshot state itself
- Since this is a sink operator, ideally there will not be anyone depending on this operator and this will be finally writing the files only so it is better to do commit in single go
- If we do single phase commit, there is no need to store checkpoints in external store also, we can ensure data will be committed before checkpoint is marked as complete
- We will never face any issues of data loss if checkpoint is corrupted etc
@mudit-97: If your job fails after the FlinkSink finishing the snapshotState
, but before the notifyCheckpointComplete
, and any of the other operators fail in the snapshotState
then Flink could decide that the the checkpoint failed, and start from a previous checkpoint. In this case the data will be written again into the Iceberg table (we will have data duplication).
https://en.wikipedia.org/wiki/Two-phase_commit_protocol
@pvary yes, that corner case can happen but in most of the scenarios data duplication will be okay as compared to data loss which can happen due to checkpoint corruption or in some other corner cases where messages in source operators are acked but then iceberg commit failed which will happen in notifyCheckpointComplete. And in case of data loss then we will have to take back source checkpoint, which can result in more data duplication
@mudit-97: The Flink community did outstanding work to handle all of these corner cases. I do not think it is a good idea to throw away that work.
What is the issue with the 2 phase commit solution?
sure @pvary, we wanted single phase commit solution because of this thought process:
- We are writing a Pubsub source operator which will ack the message on notifyCheckpointComplete
- If 2PC is used, then notifyCheckpointComplete will be called parallely and there is no guarantee the messages which are acked in PubSub are even written to Iceberg or not, they might still be in the checkpoint directory
- If during any time, job goes down we have to take care of managing the checkpoints always and resuming job from checkpoint. If checkpoints are corrupted, we will have to seek back the PubSub operator
- Apart from all of this, PubSub metrics / any source operator metric will never give a consistent view as acked messages can still be lying in checkpoint directory instead of lying in sink
We understand there can be duplication of messages in this case, but for some cases we believe duplication would be okay instead of managing checkpoints and taking care of corruption in them and also maintaining consistent metrics along the way especially metrics like watermarks
Thats why we wanted to keep this behavior behind a flag so that consumers can choose to have it if needed
if 2PC is used, then notifyCheckpointComplete will be called parallely and there is no guarantee the messages which are acked in PubSub are even written to Iceberg or not, they might still be in the checkpoint directory
this is correct. but once Flink checkpoint is recorded successfully, even if there is a commit failure, those files/data will be committed in the next successful checkpoint. there is no data loss unless the checkpoint is corrupted or not usable.
yea @stevenzwu , and thats why for 2 major reasons we wanted to have choice of 1PC with us:
- Keeping metrics consistent, whatever shows as acked, is actually in data
- and, no handling of checkpoint corruption needed
And for above 2 points, we will take some hit in terms of observing higher duplication of messages in non happy path scenarios
Keeping metrics consistent, whatever shows as acked, is actually in data
this is also incorrect with 1PC. PubSub sources may have acknowledged the messages as source operator is the first to execute snapshotState
operation. but later Iceberg commit may have failed in the snapshotState
step, which also leads to checkpoint failure.
@stevenzwu , the Pubsub operator will ack the messages in notifyCheckpointComplete()
In general, I feel unease of changing the Flink Iceberg sink behavior from 2pc to 1pc. Would at least have more broader community input before we think it is good to add this option. Might be good to bring this up in the community sync meeting in the future.
@stevenzwu , the Pubsub operator will ack the messages in notifyCheckpointComplete()
this is not guaranteed. so we may still have inconsistent. not sure if this scenario is problematic for you or not. https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/common/state/CheckpointListener.html
These notifications are "best effort", meaning they can sometimes be skipped.
It also could be the case where ack failed in the PubSub source notifyCheckpointComplete step. then inconsistency can also happen
and, no handling of checkpoint corruption needed
When do you encounter checkpoint corruption? Have you brought it up with the Flink community?
Keeping metrics consistent, whatever shows as acked, is actually in data
I feel it is better to decouple the source and sink behavior. When Flink checkpoint completed, Iceberg sink just guarantee processed records are bookmarked/committed (not lost).
The other way is not true. When data are in sink, source may not have ack'ed them. is this inconsistency a problem? if not, why is the other way a problem?
sure @stevenzwu , thanks for the inputs, in our case, we were okay with data duplication in non happy path scenarios but given your inputs, it might not be okay with everyone. We can discuss in next community sync to get more inputs on this.
@mudit-97 next community sync meeting might be focused on materialized view. the other option to get broader feedback is to start a discussion thread on dev@, which tends to get more attention than PR comments.
sure, I already have a thread there, let me add you also there