hudi
hudi copied to clipboard
[SUPPORT] Flink Table planner not loading problem
Describe the problem you faced Version Information
hudi-0.13.0 flink1.16 flink sql( HUDI CREATE DDL):
CREATE TABLE `ods_action_log_huya_hudi_nopro_test` (
`stime` VARCHAR PRIMARY KEY,
`product` VARCHAR,
`eid` VARCHAR,
`curpage` VARCHAR,
`curlocation` VARCHAR,
`mid` VARCHAR ,
`yyuid` BIGINT,
`prop` VARCHAR,
`dt` VARCHAR,
`hour` VARCHAR
) PARTITIONED BY (`dt`, `hour`)
WITH (
'connector' = 'hudi',
'write.tasks' = '64',
'write.operation' = 'insert', -- The write operation, that this write should do (insert or upsert is supported)
'path' = 'hdfs://huyaopclusternew/user/hive/warehouse/dw_rt_ods.db/ods_action_log_huya_hudi_nopro_test',
'table.type' = 'COPY_ON_WRITE', -- If MERGE_ON_READ, hive query will not have output until the parquet file is generated
'hoodie.bucket.index.num.buckets' = '1',
'hoodie.bucket.index.hash.field' = 'stime',
'hoodie.clean.async' = 'true',
'hoodie.cleaner.commits.retained' = '5',
'hoodie.datasource.write.hive_style_partitioning' = 'true',
'clustering.async.enabled' = 'true'
);
When I turn on 'clustering.async.enabled' = 'true' Start error:
Caused by: java.lang.ClassNotFoundException: org.apache.flink.table.planner.codegen.sort.SortCodeGenerator
Reasons and suggestions
Flink does not have Flink-table-planner in the jvm operating environment by default after 1.15. The related planner is loaded and used through flink-table-planner-loader through subclasspath. SortCodeGenerator is used in hudi to do part of the logic of the cluster, so This exception occurs.
Specific link: https://nightlies.apache.org/flink/flink-docs-master/docs/dev/configuration/advanced/
So I think that hudi needs to improve the code adaptation after flink1.15, or give a reminder that the user should move the flink-table-planner jar package in the opt directory to the lib directory in the flink release version To adapt to the operating environment of hudi
cc @danny0405
Thanks for the reminder, maybe we can add this notion on the Hudi website: https://nightlies.apache.org/flink/flink-docs-master/docs/dev/configuration/advanced/#anatomy-of-table-dependencies
I think it is good to add reminders, but based on our current operating environment, we need to package two different flink images to distinguish them in the production environment, so can you consider adding these dependency shades options to hudi?
I think it is good to add reminders, but based on our current operating environment, we need to package two different flink images to distinguish them in the production environment, so can you consider adding these dependency shades options to hudi?
No, hudi does not package enging specific jars in the uber jar.
No, hudi does not package enging specific jars in the uber jar.
This will result in a large package
The more serious problem is once you shade a jar, you need to shade all the jar that it depends on, or there could be conflicts because of the indirectly introduced classes.
I hit this issue on Amazon Kinesis Analytics (Flink) service. I tried to include the flink-table-planner_2.12-1.15.2.jar
into the shadowJar, but this doesn't work.
Is there a known solution how to fix this on Amazon Kinesis Analytics?
Did you have the flink-table-planner-loader
in the classpath?
Do you mean I should put flink-table-planner-loader
in the jobs shadowJar? I didn't try that.
But, my understanding from Flink documentation is that flink-table-planner-loader
is already by default in the classpath (/lib
folder). We have no control on the Flink distribution setup with Amazon Kinesis Analytics, it uses currently Flink 1.15.2 so I assume flink-table-planner-loader
should be in classpath.
Did you use the Flink session cluster? What class is missing here?
Did you use the Flink session cluster? What class is missing here?
Amazon Kinesis Analytics is an Application cluster as far as I know, not a session cluster.
The class missing is given in the description of this issue, which is;
Caused by: java.lang.ClassNotFoundException: org.apache.flink.table.planner.codegen.sort.SortCodeGenerator
This happens when clustering.async.enabled
is enabled. Otherwise the job runs fine.
The class is located in flink-table-planner-loader
, can aws supporters give some help here?
Any update on this?
Currently, it is impossible to run in AWS Kinesis Analytics a Hudi job in append mode (COW and insert)
with clustering enabled
. Job runs fine with clustering.async.enabled=false
but then we get many many small files....
The class org.apache.flink.table.planner.codegen.sort.SortCodeGenerator
is actually in the flink-table-planner_2.12
not in the flink-table-planner-loader
. That's why this issue happens.
However, as documented by Flink, dependency on flink-table-planner_2.12
is deprecated since 1.15
, and projects should refactor out of it.
Is there an idea how to remove dependency to org.apache.flink.table.planner.codegen.sort.SortCodeGenerator
? I could give it a shot, but right now the code is not that familiar so I would need a clue where to start....
@ertanden I also found this problem, it may can be solved by:
(1) Copy the codegen module from flink into hudi, but it is written by scala;
(2) Try to use PlannerCodeLoader
in flink to load these classes;
(3) Instead of using the SortCodeGenerator API, look for other APIs
Currently I'm trying the second way to solve it
@danny0405 do you have any other suggestions~ This should be a blocker problem, 100% reproducible after flink 1.15
@ertanden @Zouxxyy
dependency on flink-table-planner_2.12 is deprecated since 1.15, and projects should refactor out of it.
That's correct, we introduce SortCodeGenerator
in the first place to fetch flexibility of sorting on operator level, and Flink seem has no better solution for it. Even though we have dependency on it, the flink-table-planner classes should still be on the classpath right? I'm not that familiair with PlannerCodeLoader
too, maybe that's the direction deserves searching around.
@danny0405 flink-table-planner has been removed from the classpath since flink1.15
flink-table-planner has been removed from the classpath since flink1.15
Then how the classes in flink-table-planner is loaded then, can we do similiar loadings in Hudi side?
flink-table-planner has been removed from the classpath since flink1.15
Then how the classes in flink-table-planner is loaded then, can we do similiar loadings in Hudi side?
This logic is executed when flink graph is built, not in jobmanager (except application-mode). The classpath running hudi-related logic does not load table-planner by default, because table-planner passes flink-table since flink1.15 -planner-loader dynamically loads flink-table-planner at runtime (because table-planner is not necessarily bound to blink), obviously hudi binds the sort implementation of blink in the code, so I think hudi may need to try to copy SortCodeGenerator related logic (I am not sure)
SortCodeGenerator
is in scala, we do not want to introduce scala code in hudi-flink
module.
Hello,
Is there any solution for this. I'm running Flink SQL client locally and it has flink-table-planner-loader-1.17.1.jar in the /opt/flink/lib folder (I'm using Docker).
However, if Async Clustering is enabled I receive the same error as above:
java.lang.ClassNotFoundException: org.apache.flink.table.planner.codegen.sort.SortCodeGenerator
If I replace flink-table-planner-loader-1.17.1.jar with the flink-table-planner_2.12-1.17.1.jar from opt folder, then the following error occurs:
Caused by: java.lang.NoSuchFieldError: CHAR_LENGTH
at org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable.<clinit>(FlinkSqlOperatorTable.java:1156)
at org.apache.flink.table.planner.delegation.PlannerContext.getBuiltinSqlOperatorTable(PlannerContext.java:322)
at java.base/java.util.Optional.orElseGet(Unknown Source)
at org.apache.flink.table.planner.delegation.PlannerContext.getSqlOperatorTable(PlannerContext.java:311)
at org.apache.flink.table.planner.delegation.PlannerContext.createFrameworkConfig(PlannerContext.java:147)
at org.apache.flink.table.planner.delegation.PlannerContext.<init>(PlannerContext.java:124)
at org.apache.flink.table.planner.delegation.PlannerBase.<init>(PlannerBase.scala:121)
at org.apache.flink.table.planner.delegation.StreamPlanner.<init>(StreamPlanner.scala:65)
at org.apache.flink.table.planner.delegation.DefaultPlannerFactory.create(DefaultPlannerFactory.java:65)
at org.apache.flink.table.factories.PlannerFactoryUtil.createPlanner(PlannerFactoryUtil.java:58)
at org.apache.flink.table.gateway.service.operation.OperationExecutor.createStreamTableEnvironment(OperationExecutor.java:369)
at org.apache.flink.table.gateway.service.operation.OperationExecutor.getTableEnvironment(OperationExecutor.java:326)
at org.apache.flink.table.gateway.service.operation.OperationExecutor.executeStatement(OperationExecutor.java:185)
at org.apache.flink.table.gateway.service.SqlGatewayServiceImpl.lambda$executeStatement$1(SqlGatewayServiceImpl.java:212)
at org.apache.flink.table.gateway.service.operation.OperationManager.lambda$submitOperation$1(OperationManager.java:119)
at org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:258)
... 7 more