hudi icon indicating copy to clipboard operation
hudi copied to clipboard

[SUPPORT]File Not Found Exception occurrs when Flink task read Hudi MOR table failure recover

Open Sparsamkeit opened this issue 1 year ago • 5 comments

Describe the problem you faced

After Flink fails to read the Hudi MOR table task and restarts after a period of time, an exception will occur that the log file does not exist.

This may be because the log files have been merged into the parquet file.

To Reproduce

Steps to reproduce the behavior:

1.Flink task read Hudi MOR table by sql, enable savepoint. 2.Stop read task, write task still write. 3.Recover Flink task by savepoint.

Environment Description

  • Hudi version : 0.13.1

  • Flink version : 1.14.5

  • Storage (HDFS/S3/GCS..) : HDFS

  • Running on Docker? (yes/no) : no

Additional context

In StreamReadOperator, all pending splits are saved to State when taking a snapshot, and the splits contain the log path. After merging log files into parquet files, an exception occurs https://github.com/apache/hudi/blob/a23c5b783e70e31bb269e2dd22604cd34928d162/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadOperator.java#L135-L141

Sparsamkeit avatar Apr 10 '24 05:04 Sparsamkeit

yeah, I think it may make sense we add this fix: just skip the missing files while we recovering from the state by do a existence check.

And before dispatching the reader task, we might also need the existence check.

Both of the checks will introduce burdens to the DFS access.

danny0405 avatar Apr 10 '24 09:04 danny0405

yeah, I think it may make sense we add this fix: just skip the missing files while we recovering from the state by do a existence check.

@danny0405 Will skipping cause data loss?

Sparsamkeit avatar Apr 10 '24 12:04 Sparsamkeit

yes, of course, onless we re-generate the input splicts based on the latest snapshot.

danny0405 avatar Apr 10 '24 23:04 danny0405

How can we avoid such data loss issues when operating with Flink SQL? Is there a specific approach or best practice that we should follow to ensure data integrity during Flink SQL operations?

juice411 avatar Apr 15 '24 03:04 juice411

A solution to mediate the issue is to increase the retained commits for historical data, so that the reader has enough buffer time for consumption.

Another solution is we add some progress signal on the reader side so that cleaner can be aware of the data that been consumed, but generally I don't think we should do this(reader should not block writer), so the task is suspended for a long time.

danny0405 avatar Apr 15 '24 04:04 danny0405