hudi
hudi copied to clipboard
[BUG] Failure Encountered When Reading Hudi with Flink in Batch Runtime Mode and FlinkOptions.READ_AS_STREAMING=false
I am currently experiencing an issue when attempting to read Hudi with Flink. The problem arises when I configure the Flink RuntimeMode as 'batch' and set the Hudi FlinkOptions.READ_AS_STREAMING to 'false'.
A clear and concise description of the problem.
To Reproduce
- Set Flink RuntimeMode to 'batch'.
- Set Hudi FlinkOptions.READ_AS_STREAMING to 'false'.
- Attempt to read Hudi with Flink.
Expected behavior
I expected read Hudi table in batch successfully with Flink under these configurations.
** Actual behavior **
A failure occurs when attempting to read Hudi with Flink under these configurations.
Environment Description
-
Hudi version : From 1.10 ~ 1.14
-
Flink version: 1.13
Additional context
In the HoodieTableSource implementation for Flink's DynamicTableSource, a ScanRuntimeProvider is provided. This ScanRuntimeProvider implements the produceDataStream method, which generates a DataStreamSource. However, when in Bounded mode, it not explicitly specify the Boundedness parameter. By default, Flink uses Boundedness.CONTINUOUS_UNBOUNDED as the default parameter, which could potentially be the cause of this issue.
Code at Hudi HoodieTableSource.java
if (conf.getBoolean(FlinkOptions.READ_AS_STREAMING)) {
...
} else {
...
DataStreamSource<RowData> source = execEnv.addSource(func, asSummaryString(), typeInfo);
...
}
Perhaps the code could be modified as follows:
if (!isBounded()) {
...
} else {
...
DataStreamSource<RowData> source = execEnv.addSource(func, asSummaryString(), typeInfo, Boundedness.BOUNDED);
...
}
Stacktrace
Caused by: java.lang.IllegalStateException: Detected an UNBOUNDED source with the 'execution.runtime-mode' set to 'BATCH'. This combination is not allowed, please set the 'execution.runtime-mode' to STREAMING or AUTOMATIC
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Detected an UNBOUNDED source with the 'execution.runtime-mode' set to 'BATCH'. This combination is not allowed, please set the 'execution.runtime-mode' to STREAMING or AUTOMATIC
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:381)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:223)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
Are you doing the snapshot queries, it should work as expected. I don't think we need an explicit boundness setup, do you have other sources in the DAG besides the Hoodie source?
Are you doing the snapshot queries, it should work as expected. I don't think we need an explicit boundness setup, do you have other sources in the DAG besides the Hoodie source?
Let me reply one by one.
Are you doing the snapshot queries
Yes. I'm doing the snapshot query, the relevant parameters are as follows:
'hoodie.datasource.query.type'='snapshot'
'table.type'='MERGE_ON_READ'
I don't think we need an explicit boundness setup
Is that means when query snapshot, as Code at Hudi HoodieTableSource.java code show, call with addSource use default paramter Boundedness.CONTINUOUS_UNBOUNDED has no impact on the final Flink Stream behavior, or there are other mechanisms to ensure it is finally is Boundedness.BOUNDED .
do you have other sources in the DAG besides the Hoodie source?
No, there's only one, which is the Hoodie Source.
Are you doing the snapshot queries, it should work as expected. I don't think we need an explicit boundness setup, do you have other sources in the DAG besides the Hoodie source?
Yeah, prople never reports failure for batch snapshot queries.
@ailinzhou Are you still facing this issue?
@ailinzhou Are you still facing this issue?
Yes, unfortunately, I'm still experiencing the problem.
@ailinzhou Can you provide script/commands what you are using?