hudi
hudi copied to clipboard
[SUPPORT]File Not Found Exception occurrs when Flink task read Hudi MOR table failure recover
Describe the problem you faced
After Flink fails to read the Hudi MOR table task and restarts after a period of time, an exception will occur that the log file does not exist.
This may be because the log files have been merged into the parquet file.
To Reproduce
Steps to reproduce the behavior:
1.Flink task read Hudi MOR table by sql, enable savepoint. 2.Stop read task, write task still write. 3.Recover Flink task by savepoint.
Environment Description
-
Hudi version : 0.13.1
-
Flink version : 1.14.5
-
Storage (HDFS/S3/GCS..) : HDFS
-
Running on Docker? (yes/no) : no
Additional context
In StreamReadOperator, all pending splits are saved to State when taking a snapshot, and the splits contain the log path. After merging log files into parquet files, an exception occurs https://github.com/apache/hudi/blob/a23c5b783e70e31bb269e2dd22604cd34928d162/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadOperator.java#L135-L141
yeah, I think it may make sense we add this fix: just skip the missing files while we recovering from the state by do a existence check.
And before dispatching the reader task, we might also need the existence check.
Both of the checks will introduce burdens to the DFS access.
yeah, I think it may make sense we add this fix: just skip the missing files while we recovering from the state by do a existence check.
@danny0405 Will skipping cause data loss?
yes, of course, onless we re-generate the input splicts based on the latest snapshot.
How can we avoid such data loss issues when operating with Flink SQL? Is there a specific approach or best practice that we should follow to ensure data integrity during Flink SQL operations?
A solution to mediate the issue is to increase the retained commits for historical data, so that the reader has enough buffer time for consumption.
Another solution is we add some progress signal on the reader side so that cleaner can be aware of the data that been consumed, but generally I don't think we should do this(reader should not block writer), so the task is suspended for a long time.