iceberg icon indicating copy to clipboard operation
iceberg copied to clipboard

Flink: Made IcebergFilesCommitter work with single phase commit

Open mudit-97 opened this issue 1 year ago • 1 comments

mudit-97 avatar Feb 09 '24 19:02 mudit-97

How would the single phase commit solution support the exact one semantics for the Iceberg Sink?

pvary avatar Feb 24 '24 06:02 pvary

@pvary we felt that even single phase commit should not have much of an issue if users want to use that

Why?

  1. Current implementation is 2 step so expectation with users is that once checkpoint's notifyCheckpointComplete is called then only data will be discoverable in Iceberg tables, but we felt that users should not have much of an issue if we do commit in snapshot state itself
  2. Since this is a sink operator, ideally there will not be anyone depending on this operator and this will be finally writing the files only so it is better to do commit in single go
  3. If we do single phase commit, there is no need to store checkpoints in external store also, we can ensure data will be committed before checkpoint is marked as complete
  4. We will never face any issues of data loss if checkpoint is corrupted etc

mudit-97 avatar Feb 28 '24 10:02 mudit-97

@mudit-97: If your job fails after the FlinkSink finishing the snapshotState, but before the notifyCheckpointComplete, and any of the other operators fail in the snapshotState then Flink could decide that the the checkpoint failed, and start from a previous checkpoint. In this case the data will be written again into the Iceberg table (we will have data duplication).

https://en.wikipedia.org/wiki/Two-phase_commit_protocol

pvary avatar Feb 28 '24 12:02 pvary

@pvary yes, that corner case can happen but in most of the scenarios data duplication will be okay as compared to data loss which can happen due to checkpoint corruption or in some other corner cases where messages in source operators are acked but then iceberg commit failed which will happen in notifyCheckpointComplete. And in case of data loss then we will have to take back source checkpoint, which can result in more data duplication

mudit-97 avatar Feb 28 '24 12:02 mudit-97

@mudit-97: The Flink community did outstanding work to handle all of these corner cases. I do not think it is a good idea to throw away that work.

What is the issue with the 2 phase commit solution?

pvary avatar Feb 28 '24 16:02 pvary

sure @pvary, we wanted single phase commit solution because of this thought process:

  1. We are writing a Pubsub source operator which will ack the message on notifyCheckpointComplete
  2. If 2PC is used, then notifyCheckpointComplete will be called parallely and there is no guarantee the messages which are acked in PubSub are even written to Iceberg or not, they might still be in the checkpoint directory
  3. If during any time, job goes down we have to take care of managing the checkpoints always and resuming job from checkpoint. If checkpoints are corrupted, we will have to seek back the PubSub operator
  4. Apart from all of this, PubSub metrics / any source operator metric will never give a consistent view as acked messages can still be lying in checkpoint directory instead of lying in sink

We understand there can be duplication of messages in this case, but for some cases we believe duplication would be okay instead of managing checkpoints and taking care of corruption in them and also maintaining consistent metrics along the way especially metrics like watermarks

Thats why we wanted to keep this behavior behind a flag so that consumers can choose to have it if needed

mudit-97 avatar Feb 28 '24 16:02 mudit-97

if 2PC is used, then notifyCheckpointComplete will be called parallely and there is no guarantee the messages which are acked in PubSub are even written to Iceberg or not, they might still be in the checkpoint directory

this is correct. but once Flink checkpoint is recorded successfully, even if there is a commit failure, those files/data will be committed in the next successful checkpoint. there is no data loss unless the checkpoint is corrupted or not usable.

stevenzwu avatar Feb 28 '24 16:02 stevenzwu

yea @stevenzwu , and thats why for 2 major reasons we wanted to have choice of 1PC with us:

  1. Keeping metrics consistent, whatever shows as acked, is actually in data
  2. and, no handling of checkpoint corruption needed

And for above 2 points, we will take some hit in terms of observing higher duplication of messages in non happy path scenarios

mudit-97 avatar Feb 28 '24 16:02 mudit-97

Keeping metrics consistent, whatever shows as acked, is actually in data

this is also incorrect with 1PC. PubSub sources may have acknowledged the messages as source operator is the first to execute snapshotState operation. but later Iceberg commit may have failed in the snapshotState step, which also leads to checkpoint failure.

stevenzwu avatar Feb 28 '24 23:02 stevenzwu

@stevenzwu , the Pubsub operator will ack the messages in notifyCheckpointComplete()

mudit-97 avatar Feb 29 '24 04:02 mudit-97

In general, I feel unease of changing the Flink Iceberg sink behavior from 2pc to 1pc. Would at least have more broader community input before we think it is good to add this option. Might be good to bring this up in the community sync meeting in the future.

@stevenzwu , the Pubsub operator will ack the messages in notifyCheckpointComplete()

this is not guaranteed. so we may still have inconsistent. not sure if this scenario is problematic for you or not. https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/common/state/CheckpointListener.html

These notifications are "best effort", meaning they can sometimes be skipped.

It also could be the case where ack failed in the PubSub source notifyCheckpointComplete step. then inconsistency can also happen

and, no handling of checkpoint corruption needed

When do you encounter checkpoint corruption? Have you brought it up with the Flink community?

Keeping metrics consistent, whatever shows as acked, is actually in data

I feel it is better to decouple the source and sink behavior. When Flink checkpoint completed, Iceberg sink just guarantee processed records are bookmarked/committed (not lost).

The other way is not true. When data are in sink, source may not have ack'ed them. is this inconsistency a problem? if not, why is the other way a problem?

stevenzwu avatar Mar 04 '24 23:03 stevenzwu

sure @stevenzwu , thanks for the inputs, in our case, we were okay with data duplication in non happy path scenarios but given your inputs, it might not be okay with everyone. We can discuss in next community sync to get more inputs on this.

mudit-97 avatar Mar 05 '24 04:03 mudit-97

@mudit-97 next community sync meeting might be focused on materialized view. the other option to get broader feedback is to start a discussion thread on dev@, which tends to get more attention than PR comments.

stevenzwu avatar Mar 05 '24 16:03 stevenzwu

sure, I already have a thread there, let me add you also there

mudit-97 avatar Mar 05 '24 16:03 mudit-97