hudi
hudi copied to clipboard
[SUPPORT] Flink inline clustering "Metadata is empty for partition: ~~"
Tips before filing an issue
-
Have you gone through our FAQs?
-
Join the mailing list to engage in conversations and get faster support at [email protected].
-
If you have triaged this as a bug, then file an issue directly.
Describe the problem you faced
A clear and concise description of the problem.
To Reproduce
Expected behavior
- Loading data from flink to hudi table, inline cluster enabled.
- The following error occurs when performing clustering.
Environment Description
-
Hudi version : 1.0.2
-
Flink version : 1.20.0
-
Hive version : 2.3.7
-
Hadoop version : 2.10.2
-
Storage (HDFS/S3/GCS..) : HDFS
-
Running on Docker? (yes/no) : yes
Additional context
my optoins
hudiOptions.put(FlinkOptions.PATH.key(), tablePath);
hudiOptions.put(FlinkOptions.DATABASE_NAME.key(), targetDatabase);
hudiOptions.put(FlinkOptions.TABLE_NAME.key(), targetTable);
hudiOptions.put(HoodieMetadataConfig.METRICS_ENABLE.key(), "true");
hudiOptions.put(HoodieMetadataConfig.ASYNC_INDEX_ENABLE.key(), "true");
hudiOptions.put(HoodieMetadataConfig.ENABLE_LOG_COMPACTION_ON_METADATA_TABLE.key(), "true");
hudiOptions.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name());
hudiOptions.put(FlinkOptions.OPERATION.key(), WriteOperationType.INSERT.name());
hudiOptions.put(FlinkOptions.RECORD_KEY_FIELD.key(), "log_unique_id");
hudiOptions.put(FlinkOptions.PRECOMBINE_FIELD.key(), FlinkOptions.NO_PRE_COMBINE);
hudiOptions.put(FlinkOptions.PARTITION_PATH_FIELD.key(), "log_datetime_id");
hudiOptions.put(FlinkOptions.HIVE_SYNC_PARTITION_FIELDS.key(), "log_datetime_id");
hudiOptions.put(
FlinkOptions.KEYGEN_CLASS_NAME.key(), "org.apache.hudi.keygen.ComplexKeyGenerator");
hudiOptions.put(
FlinkOptions.PAYLOAD_CLASS_NAME.key(),
"org.apache.hudi.common.model.OverwriteWithLatestAvroPayload");
hudiOptions.put(FlinkOptions.HIVE_STYLE_PARTITIONING.key(), "true");
// hudiOptions.put(FlinkOptions.INDEX_TYPE.key(), HoodieIndex.IndexType.FLINK_STATE.name());
hudiOptions.put(FlinkOptions.INDEX_TYPE.key(), HoodieIndex.IndexType.BUCKET.name());
hudiOptions.put(
FlinkOptions.BUCKET_INDEX_ENGINE_TYPE.key(),
HoodieIndex.BucketIndexEngineType.CONSISTENT_HASHING.name());
hudiOptions.put(
FlinkOptions.BUCKET_INDEX_NUM_BUCKETS.key(), "10");
// Cleaning Service
hudiOptions.put(FlinkOptions.CLEAN_ASYNC_ENABLED.key(), "true");
// KEEP_LATEST_FILE_VERSIONS: keeps the last N versions of the file slices written;
// used when \"hoodie.clean.fileversions.retained\" is explicitly set only.
// KEEP_LATEST_COMMITS: keeps the file slices written by the last N commits;
// used when \"hoodie.clean.commits.retained\" is explicitly set only.
// KEEP_LATEST_BY_HOURS: keeps the file slices written in the last N hours based on the
// commit time;
// used when \"hoodie.clean.hours.retained\" is explicitly set only.
hudiOptions.put(
FlinkOptions.CLEAN_POLICY.key(), HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS.name());
hudiOptions.put(FlinkOptions.CLEAN_RETAIN_HOURS.key(), "24");
// hudiOptions.put(FlinkOptions.CLEAN_RETAIN_COMMITS.key(), "480");
hudiOptions.put(FlinkOptions.HIVE_SYNC_ENABLED.key(), "true");
hudiOptions.put(FlinkOptions.HIVE_SYNC_TABLE.key(), targetTable);
hudiOptions.put(FlinkOptions.HIVE_SYNC_DB.key(), targetDatabase);
hudiOptions.put(FlinkOptions.HIVE_SYNC_MODE.key(), HiveSyncMode.HMS.name());
// option for thrift is located in other file / env
// clustering service
hudiOptions.put(FlinkOptions.CLUSTERING_TASKS.key(), "10");
hudiOptions.put(HoodieClusteringConfig.INLINE_CLUSTERING.key(), "true");
hudiOptions.put(HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMITS.key(), "2");
hudiOptions.put(
FlinkOptions.CLUSTERING_PLAN_STRATEGY_CLASS.key(),
HoodieClusteringConfig.FLINK_CONSISTENT_BUCKET_CLUSTERING_PLAN_STRATEGY);
hudiOptions.put(
FlinkOptions.CLUSTERING_PLAN_PARTITION_FILTER_MODE_NAME.key(), RECENT_DAYS.name());
hudiOptions.put(
FlinkOptions.CLUSTERING_PLAN_STRATEGY_SKIP_PARTITIONS_FROM_LATEST.key(), "0");
hudiOptions.put(FlinkOptions.CLUSTERING_TARGET_PARTITIONS.key(), "3");
hudiOptions.put(
FlinkOptions.CLUSTERING_PLAN_STRATEGY_TARGET_FILE_MAX_BYTES.key(),
String.valueOf(256 * 1024 * 1024L));
hudiOptions.put(
FlinkOptions.CLUSTERING_PLAN_STRATEGY_SMALL_FILE_LIMIT.key(),
String.valueOf(180 * 1024 * 1024L));
hudiOptions.put(FlinkOptions.WRITE_TASKS.key(), "10");
// only for COW table
// hudiOptions.put(FlinkOptions.INSERT_CLUSTER.key(), "true"); // merge small file
// for insert mode
hudiOptions.put(FlinkOptions.WRITE_PARQUET_MAX_FILE_SIZE.key(), "256");
hudiOptions.put(
FlinkOptions.WRITE_BATCH_SIZE.key(), "64");
hudiOptions.put(FlinkOptions.WRITE_LOG_BLOCK_SIZE.key(), "128");
hudiOptions.put(FlinkOptions.WRITE_LOG_MAX_SIZE.key(), "256");
hudiOptions.put(TIMELINE_LAYOUT_VERSION_NUM.key(), "2");
hudiOptions.put(ARCHIVE_MIN_COMMITS.key(), "480");
hudiOptions.put(ARCHIVE_MAX_COMMITS.key(), "580");
Stacktrace
2025-06-16 17:38:15
org.apache.flink.util.FlinkException: Global failure triggered by OperatorCoordinator for 'Source: KAFKA (test-topic) -> Transform: RowData -> hoodie_append_write: default_database.table_06161722' (operator 59cdd8a1cddabde67ee500c4f33ff1ab).
at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:651)
at org.apache.hudi.sink.StreamWriteOperatorCoordinator.lambda$start$0(StreamWriteOperatorCoordinator.java:201)
at org.apache.hudi.sink.utils.NonThrownExecutor.handleException(NonThrownExecutor.java:142)
at org.apache.hudi.sink.utils.NonThrownExecutor.lambda$wrapAction$0(NonThrownExecutor.java:133)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.apache.hudi.exception.HoodieException: Executor executes action [commits the instant 20250616083812901] error
... 6 more
Caused by: org.apache.hudi.exception.HoodieException: Error occurs when executing map
at org.apache.hudi.common.function.FunctionWrapper.lambda$throwingMapWrapper$0(FunctionWrapper.java:40)
at java.base/java.util.stream.ReferencePipeline$3$1.accept(Unknown Source)
at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(Unknown Source)
at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source)
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown Source)
at java.base/java.util.stream.ReduceOps$ReduceTask.doLeaf(Unknown Source)
at java.base/java.util.stream.ReduceOps$ReduceTask.doLeaf(Unknown Source)
at java.base/java.util.stream.AbstractTask.compute(Unknown Source)
at java.base/java.util.concurrent.CountedCompleter.exec(Unknown Source)
at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
at java.base/java.util.concurrent.ForkJoinTask.doInvoke(Unknown Source)
at java.base/java.util.concurrent.ForkJoinTask.invoke(Unknown Source)
at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateParallel(Unknown Source)
at java.base/java.util.stream.AbstractPipeline.evaluate(Unknown Source)
at java.base/java.util.stream.ReferencePipeline.collect(Unknown Source)
at org.apache.hudi.client.common.HoodieFlinkEngineContext.map(HoodieFlinkEngineContext.java:114)
at org.apache.hudi.table.action.cluster.strategy.PartitionAwareClusteringPlanStrategy.generateClusteringPlan(PartitionAwareClusteringPlanStrategy.java:168)
at org.apache.hudi.table.action.cluster.ClusteringPlanActionExecutor.createClusteringPlan(ClusteringPlanActionExecutor.java:89)
at org.apache.hudi.table.action.cluster.ClusteringPlanActionExecutor.execute(ClusteringPlanActionExecutor.java:94)
at org.apache.hudi.table.HoodieFlinkCopyOnWriteTable.scheduleClustering(HoodieFlinkCopyOnWriteTable.java:332)
at org.apache.hudi.client.BaseHoodieTableServiceClient.scheduleTableServiceInternal(BaseHoodieTableServiceClient.java:666)
at org.apache.hudi.client.BaseHoodieTableServiceClient.scheduleTableService(BaseHoodieTableServiceClient.java:644)
at org.apache.hudi.client.BaseHoodieTableServiceClient.scheduleClusteringAtInstant(BaseHoodieTableServiceClient.java:462)
at org.apache.hudi.client.BaseHoodieTableServiceClient.scheduleClustering(BaseHoodieTableServiceClient.java:452)
at org.apache.hudi.client.BaseHoodieTableServiceClient.inlineScheduleClustering(BaseHoodieTableServiceClient.java:745)
at org.apache.hudi.client.BaseHoodieTableServiceClient.inlineClustering(BaseHoodieTableServiceClient.java:721)
at org.apache.hudi.client.BaseHoodieTableServiceClient.inlineClustering(BaseHoodieTableServiceClient.java:734)
at org.apache.hudi.client.BaseHoodieTableServiceClient.runTableServicesInline(BaseHoodieTableServiceClient.java:607)
at org.apache.hudi.client.BaseHoodieWriteClient.runTableServicesInlineInternal(BaseHoodieWriteClient.java:603)
at org.apache.hudi.client.BaseHoodieWriteClient.runTableServicesInline(BaseHoodieWriteClient.java:590)
at org.apache.hudi.client.BaseHoodieWriteClient.commitStats(BaseHoodieWriteClient.java:259)
at org.apache.hudi.client.HoodieFlinkWriteClient.commit(HoodieFlinkWriteClient.java:111)
at org.apache.hudi.client.HoodieFlinkWriteClient.commit(HoodieFlinkWriteClient.java:74)
at org.apache.hudi.client.BaseHoodieWriteClient.commit(BaseHoodieWriteClient.java:207)
at org.apache.hudi.sink.StreamWriteOperatorCoordinator.doCommit(StreamWriteOperatorCoordinator.java:588)
at org.apache.hudi.sink.StreamWriteOperatorCoordinator.commitInstant(StreamWriteOperatorCoordinator.java:564)
at org.apache.hudi.sink.StreamWriteOperatorCoordinator.lambda$notifyCheckpointComplete$2(StreamWriteOperatorCoordinator.java:263)
at org.apache.hudi.sink.utils.NonThrownExecutor.lambda$wrapAction$0(NonThrownExecutor.java:130)
... 3 more
Caused by: java.lang.IllegalArgumentException: Metadata is empty for partition: log_datetime_id=2025-06-16+17
at org.apache.hudi.common.util.ValidationUtils.checkArgument(ValidationUtils.java:42)
at org.apache.hudi.table.action.cluster.strategy.BaseConsistentHashingBucketClusteringPlanStrategy.buildClusteringGroupsForPartition(BaseConsistentHashingBucketClusteringPlanStrategy.java:101)
at org.apache.hudi.table.action.cluster.strategy.PartitionAwareClusteringPlanStrategy.lambda$generateClusteringPlan$d5a31cf4$1(PartitionAwareClusteringPlanStrategy.java:170)
at org.apache.hudi.common.function.FunctionWrapper.lambda$throwingMapWrapper$0(FunctionWrapper.java:38)
... 40 more
@SML0127 Flink does not support inline clustering but only async plan scheduling for consistent-hashing, you need to execute the plan in a separate job.
hudiOptions.put(HoodieClusteringConfig.INLINE_CLUSTERING.key(), "true");
@SML0127 Flink does not support inline clustering but only async plan scheduling for consistent-hashing, you need to execute the plan in a separate job.
hudiOptions.put(HoodieClusteringConfig.INLINE_CLUSTERING.key(), "true");
Hi, @danny0405 Thank you for kind answer. There are two more questions.
- Is there plan to support inline clustering at Flink or have any reasons for not supporting?
- Where and in what documentation can I find information that flink does not support inline clustering?
Flink never supprts any inline table services because the DAG of Flink operators are kind of deterministic during compile time. The case Flink does not support async clustering simultaniously with compaction is because of the same reason.
Only one async online rewrite table service is supported at a time now, either compaction or clustering.
Thank you @danny0405 From what I understand, auto compaction is also not supported in Flink with inline table service. Is that correct? (== We have to deploy another flink job for auto copmaction or clustering)
A "inline" table service indicates it is executed synchonously after each commit, instead, we do support async compaction and clustering in the ingestion job for both plan scheduling and execution.
Thank you @danny0405. I understand now after doing some more tests.
One thing I'm curious about is that not only inline table service, but also when CLUSTERING_SCHEDULE_ENABLED is set to true, PartitionAwareClusteringPlanStrategy is called and the same issue occurs. Do you know anything about it?
issue: Caused by: java.lang.IllegalArgumentException: Metadata is empty for partition: log_datetime_id=2025-06-16+02
you need to disable the execution
Hi @SML0127
We understand Danny provided a solution for this issue. Please reply if you have any follow-up questions. Otherwise, we will automatically close this ticket in one week if we don't hear back.
Closing this issue because the user has not responded to follow-up questions.