hudi icon indicating copy to clipboard operation
hudi copied to clipboard

[SUPPORT] Flink Table planner not loading problem

Open huyuanfeng2018 opened this issue 1 year ago • 20 comments

Describe the problem you faced Version Information

hudi-0.13.0 flink1.16 flink sql( HUDI CREATE DDL):

CREATE TABLE `ods_action_log_huya_hudi_nopro_test` (
  
  `stime` VARCHAR  PRIMARY KEY,
  `product` VARCHAR,
  `eid` VARCHAR,
  `curpage` VARCHAR,
  `curlocation` VARCHAR,
  `mid` VARCHAR ,
  `yyuid` BIGINT,
  `prop` VARCHAR,
  `dt` VARCHAR,
  `hour` VARCHAR
) PARTITIONED BY (`dt`, `hour`) 
WITH (
  'connector' = 'hudi',
  'write.tasks' = '64',
  'write.operation' = 'insert',  -- The write operation, that this write should do (insert or upsert is supported)
  'path' = 'hdfs://huyaopclusternew/user/hive/warehouse/dw_rt_ods.db/ods_action_log_huya_hudi_nopro_test',  
  'table.type' = 'COPY_ON_WRITE',  -- If MERGE_ON_READ, hive query will not have output until the parquet file is generated
  'hoodie.bucket.index.num.buckets' = '1',
  'hoodie.bucket.index.hash.field' = 'stime',
  'hoodie.clean.async' = 'true',
  'hoodie.cleaner.commits.retained' = '5',
  'hoodie.datasource.write.hive_style_partitioning' = 'true',
  'clustering.async.enabled' = 'true'
);

When I turn on 'clustering.async.enabled' = 'true' Start error:

Caused by: java.lang.ClassNotFoundException: org.apache.flink.table.planner.codegen.sort.SortCodeGenerator

Reasons and suggestions

Flink does not have Flink-table-planner in the jvm operating environment by default after 1.15. The related planner is loaded and used through flink-table-planner-loader through subclasspath. SortCodeGenerator is used in hudi to do part of the logic of the cluster, so This exception occurs.

Specific link: https://nightlies.apache.org/flink/flink-docs-master/docs/dev/configuration/advanced/

So I think that hudi needs to improve the code adaptation after flink1.15, or give a reminder that the user should move the flink-table-planner jar package in the opt directory to the lib directory in the flink release version To adapt to the operating environment of hudi

huyuanfeng2018 avatar Mar 22 '23 03:03 huyuanfeng2018

cc @danny0405

huyuanfeng2018 avatar Mar 22 '23 03:03 huyuanfeng2018

Thanks for the reminder, maybe we can add this notion on the Hudi website: https://nightlies.apache.org/flink/flink-docs-master/docs/dev/configuration/advanced/#anatomy-of-table-dependencies

danny0405 avatar Mar 22 '23 07:03 danny0405

I think it is good to add reminders, but based on our current operating environment, we need to package two different flink images to distinguish them in the production environment, so can you consider adding these dependency shades options to hudi?

I think it is good to add reminders, but based on our current operating environment, we need to package two different flink images to distinguish them in the production environment, so can you consider adding these dependency shades options to hudi?

huyuanfeng2018 avatar Mar 22 '23 07:03 huyuanfeng2018

No, hudi does not package enging specific jars in the uber jar.

danny0405 avatar Mar 22 '23 07:03 danny0405

No, hudi does not package enging specific jars in the uber jar.

This will result in a large package

huyuanfeng2018 avatar Mar 22 '23 07:03 huyuanfeng2018

The more serious problem is once you shade a jar, you need to shade all the jar that it depends on, or there could be conflicts because of the indirectly introduced classes.

danny0405 avatar Mar 23 '23 05:03 danny0405

I hit this issue on Amazon Kinesis Analytics (Flink) service. I tried to include the flink-table-planner_2.12-1.15.2.jar into the shadowJar, but this doesn't work.

Is there a known solution how to fix this on Amazon Kinesis Analytics?

ertanden avatar May 30 '23 15:05 ertanden

Did you have the flink-table-planner-loader in the classpath?

danny0405 avatar May 31 '23 02:05 danny0405

Do you mean I should put flink-table-planner-loader in the jobs shadowJar? I didn't try that.

But, my understanding from Flink documentation is that flink-table-planner-loader is already by default in the classpath (/lib folder). We have no control on the Flink distribution setup with Amazon Kinesis Analytics, it uses currently Flink 1.15.2 so I assume flink-table-planner-loader should be in classpath.

ertanden avatar May 31 '23 06:05 ertanden

Did you use the Flink session cluster? What class is missing here?

danny0405 avatar May 31 '23 06:05 danny0405

Did you use the Flink session cluster? What class is missing here?

Amazon Kinesis Analytics is an Application cluster as far as I know, not a session cluster.

The class missing is given in the description of this issue, which is;

Caused by: java.lang.ClassNotFoundException: org.apache.flink.table.planner.codegen.sort.SortCodeGenerator

This happens when clustering.async.enabled is enabled. Otherwise the job runs fine.

ertanden avatar May 31 '23 11:05 ertanden

The class is located in flink-table-planner-loader, can aws supporters give some help here?

danny0405 avatar May 31 '23 14:05 danny0405

Any update on this?

Currently, it is impossible to run in AWS Kinesis Analytics a Hudi job in append mode (COW and insert) with clustering enabled. Job runs fine with clustering.async.enabled=false but then we get many many small files....

The class org.apache.flink.table.planner.codegen.sort.SortCodeGenerator is actually in the flink-table-planner_2.12 not in the flink-table-planner-loader. That's why this issue happens.

However, as documented by Flink, dependency on flink-table-planner_2.12 is deprecated since 1.15, and projects should refactor out of it.

Is there an idea how to remove dependency to org.apache.flink.table.planner.codegen.sort.SortCodeGenerator? I could give it a shot, but right now the code is not that familiar so I would need a clue where to start....

ertanden avatar Jun 23 '23 08:06 ertanden

@ertanden I also found this problem, it may can be solved by:

(1) Copy the codegen module from flink into hudi, but it is written by scala; (2) Try to use PlannerCodeLoader in flink to load these classes; (3) Instead of using the SortCodeGenerator API, look for other APIs

Currently I'm trying the second way to solve it

@danny0405 do you have any other suggestions~ This should be a blocker problem, 100% reproducible after flink 1.15

Zouxxyy avatar Jun 24 '23 06:06 Zouxxyy

@ertanden @Zouxxyy

dependency on flink-table-planner_2.12 is deprecated since 1.15, and projects should refactor out of it.

That's correct, we introduce SortCodeGenerator in the first place to fetch flexibility of sorting on operator level, and Flink seem has no better solution for it. Even though we have dependency on it, the flink-table-planner classes should still be on the classpath right? I'm not that familiair with PlannerCodeLoader too, maybe that's the direction deserves searching around.

danny0405 avatar Jun 25 '23 03:06 danny0405

@danny0405 flink-table-planner has been removed from the classpath since flink1.15

Zouxxyy avatar Jun 25 '23 08:06 Zouxxyy

flink-table-planner has been removed from the classpath since flink1.15

Then how the classes in flink-table-planner is loaded then, can we do similiar loadings in Hudi side?

danny0405 avatar Jun 26 '23 03:06 danny0405

flink-table-planner has been removed from the classpath since flink1.15

Then how the classes in flink-table-planner is loaded then, can we do similiar loadings in Hudi side?

This logic is executed when flink graph is built, not in jobmanager (except application-mode). The classpath running hudi-related logic does not load table-planner by default, because table-planner passes flink-table since flink1.15 -planner-loader dynamically loads flink-table-planner at runtime (because table-planner is not necessarily bound to blink), obviously hudi binds the sort implementation of blink in the code, so I think hudi may need to try to copy SortCodeGenerator related logic (I am not sure)

huyuanfeng2018 avatar Jul 09 '23 16:07 huyuanfeng2018

SortCodeGenerator is in scala, we do not want to introduce scala code in hudi-flink module.

danny0405 avatar Jul 10 '23 02:07 danny0405

Hello,

Is there any solution for this. I'm running Flink SQL client locally and it has flink-table-planner-loader-1.17.1.jar in the /opt/flink/lib folder (I'm using Docker).

However, if Async Clustering is enabled I receive the same error as above:

java.lang.ClassNotFoundException: org.apache.flink.table.planner.codegen.sort.SortCodeGenerator

If I replace flink-table-planner-loader-1.17.1.jar with the flink-table-planner_2.12-1.17.1.jar from opt folder, then the following error occurs:

Caused by: java.lang.NoSuchFieldError: CHAR_LENGTH
	at org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable.<clinit>(FlinkSqlOperatorTable.java:1156)
	at org.apache.flink.table.planner.delegation.PlannerContext.getBuiltinSqlOperatorTable(PlannerContext.java:322)
	at java.base/java.util.Optional.orElseGet(Unknown Source)
	at org.apache.flink.table.planner.delegation.PlannerContext.getSqlOperatorTable(PlannerContext.java:311)
	at org.apache.flink.table.planner.delegation.PlannerContext.createFrameworkConfig(PlannerContext.java:147)
	at org.apache.flink.table.planner.delegation.PlannerContext.<init>(PlannerContext.java:124)
	at org.apache.flink.table.planner.delegation.PlannerBase.<init>(PlannerBase.scala:121)
	at org.apache.flink.table.planner.delegation.StreamPlanner.<init>(StreamPlanner.scala:65)
	at org.apache.flink.table.planner.delegation.DefaultPlannerFactory.create(DefaultPlannerFactory.java:65)
	at org.apache.flink.table.factories.PlannerFactoryUtil.createPlanner(PlannerFactoryUtil.java:58)
	at org.apache.flink.table.gateway.service.operation.OperationExecutor.createStreamTableEnvironment(OperationExecutor.java:369)
	at org.apache.flink.table.gateway.service.operation.OperationExecutor.getTableEnvironment(OperationExecutor.java:326)
	at org.apache.flink.table.gateway.service.operation.OperationExecutor.executeStatement(OperationExecutor.java:185)
	at org.apache.flink.table.gateway.service.SqlGatewayServiceImpl.lambda$executeStatement$1(SqlGatewayServiceImpl.java:212)
	at org.apache.flink.table.gateway.service.operation.OperationManager.lambda$submitOperation$1(OperationManager.java:119)
	at org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:258)
	... 7 more

vkhoroshko avatar Feb 05 '24 13:02 vkhoroshko