hudi icon indicating copy to clipboard operation
hudi copied to clipboard

[BUG] Failure Encountered When Reading Hudi with Flink in Batch Runtime Mode and FlinkOptions.READ_AS_STREAMING=false

Open ailinzhou opened this issue 1 year ago • 4 comments

I am currently experiencing an issue when attempting to read Hudi with Flink. The problem arises when I configure the Flink RuntimeMode as 'batch' and set the Hudi FlinkOptions.READ_AS_STREAMING to 'false'.

A clear and concise description of the problem.

To Reproduce

  1. Set Flink RuntimeMode to 'batch'.
  2. Set Hudi FlinkOptions.READ_AS_STREAMING to 'false'.
  3. Attempt to read Hudi with Flink.

Expected behavior

I expected read Hudi table in batch successfully with Flink under these configurations.

** Actual behavior **

A failure occurs when attempting to read Hudi with Flink under these configurations.

Environment Description

  • Hudi version : From 1.10 ~ 1.14

  • Flink version: 1.13

Additional context

In the HoodieTableSource implementation for Flink's DynamicTableSource, a ScanRuntimeProvider is provided. This ScanRuntimeProvider implements the produceDataStream method, which generates a DataStreamSource. However, when in Bounded mode, it not explicitly specify the Boundedness parameter. By default, Flink uses Boundedness.CONTINUOUS_UNBOUNDED as the default parameter, which could potentially be the cause of this issue.

Code at Hudi HoodieTableSource.java

        if (conf.getBoolean(FlinkOptions.READ_AS_STREAMING)) {
          ...
        } else {
          ...
          DataStreamSource<RowData> source = execEnv.addSource(func, asSummaryString(), typeInfo);
          ...
        }

Perhaps the code could be modified as follows:

        if (!isBounded()) {
          ...
        } else {
          ...
          DataStreamSource<RowData> source = execEnv.addSource(func, asSummaryString(), typeInfo, Boundedness.BOUNDED);
          ...
        }

Stacktrace


Caused by: java.lang.IllegalStateException: Detected an UNBOUNDED source with the 'execution.runtime-mode' set to 'BATCH'. This combination is not allowed, please set the 'execution.runtime-mode' to STREAMING or AUTOMATIC

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Detected an UNBOUNDED source with the 'execution.runtime-mode' set to 'BATCH'. This combination is not allowed, please set the 'execution.runtime-mode' to STREAMING or AUTOMATIC
	at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:381)
	at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:223)
	at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
	at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
	at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
	at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
	at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
	at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
	at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)

ailinzhou avatar Jan 27 '24 16:01 ailinzhou

Are you doing the snapshot queries, it should work as expected. I don't think we need an explicit boundness setup, do you have other sources in the DAG besides the Hoodie source?

danny0405 avatar Jan 29 '24 02:01 danny0405

Are you doing the snapshot queries, it should work as expected. I don't think we need an explicit boundness setup, do you have other sources in the DAG besides the Hoodie source?

Let me reply one by one.

Are you doing the snapshot queries

Yes. I'm doing the snapshot query, the relevant parameters are as follows:

'hoodie.datasource.query.type'='snapshot'
'table.type'='MERGE_ON_READ'

I don't think we need an explicit boundness setup

Is that means when query snapshot, as Code at Hudi HoodieTableSource.java code show, call with addSource use default paramter Boundedness.CONTINUOUS_UNBOUNDED has no impact on the final Flink Stream behavior, or there are other mechanisms to ensure it is finally is Boundedness.BOUNDED .

do you have other sources in the DAG besides the Hoodie source?

No, there's only one, which is the Hoodie Source.

Are you doing the snapshot queries, it should work as expected. I don't think we need an explicit boundness setup, do you have other sources in the DAG besides the Hoodie source?

ailinzhou avatar Jan 31 '24 05:01 ailinzhou

Yeah, prople never reports failure for batch snapshot queries.

danny0405 avatar Feb 01 '24 01:02 danny0405

@ailinzhou Are you still facing this issue?

ad1happy2go avatar Feb 22 '24 13:02 ad1happy2go

@ailinzhou Are you still facing this issue?

Yes, unfortunately, I'm still experiencing the problem.

ailinzhou avatar Mar 22 '24 09:03 ailinzhou

@ailinzhou Can you provide script/commands what you are using?

ad1happy2go avatar Mar 22 '24 10:03 ad1happy2go