hudi icon indicating copy to clipboard operation
hudi copied to clipboard

[SUPPORT] Flink inline clustering "Metadata is empty for partition: ~~"

Open SML0127 opened this issue 5 months ago • 7 comments
trafficstars

Tips before filing an issue

  • Have you gone through our FAQs?

  • Join the mailing list to engage in conversations and get faster support at [email protected].

  • If you have triaged this as a bug, then file an issue directly.

Describe the problem you faced

A clear and concise description of the problem.

To Reproduce

Expected behavior

  • Loading data from flink to hudi table, inline cluster enabled.
  • The following error occurs when performing clustering.

Environment Description

  • Hudi version : 1.0.2

  • Flink version : 1.20.0

  • Hive version : 2.3.7

  • Hadoop version : 2.10.2

  • Storage (HDFS/S3/GCS..) : HDFS

  • Running on Docker? (yes/no) : yes

Additional context

my optoins

        hudiOptions.put(FlinkOptions.PATH.key(), tablePath);
        hudiOptions.put(FlinkOptions.DATABASE_NAME.key(), targetDatabase);
        hudiOptions.put(FlinkOptions.TABLE_NAME.key(), targetTable);

        hudiOptions.put(HoodieMetadataConfig.METRICS_ENABLE.key(), "true");
        hudiOptions.put(HoodieMetadataConfig.ASYNC_INDEX_ENABLE.key(), "true");
        hudiOptions.put(HoodieMetadataConfig.ENABLE_LOG_COMPACTION_ON_METADATA_TABLE.key(), "true");

        hudiOptions.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name());

        hudiOptions.put(FlinkOptions.OPERATION.key(), WriteOperationType.INSERT.name());
        hudiOptions.put(FlinkOptions.RECORD_KEY_FIELD.key(), "log_unique_id");
        hudiOptions.put(FlinkOptions.PRECOMBINE_FIELD.key(), FlinkOptions.NO_PRE_COMBINE);
        hudiOptions.put(FlinkOptions.PARTITION_PATH_FIELD.key(), "log_datetime_id");
        hudiOptions.put(FlinkOptions.HIVE_SYNC_PARTITION_FIELDS.key(), "log_datetime_id");
        hudiOptions.put(
                FlinkOptions.KEYGEN_CLASS_NAME.key(), "org.apache.hudi.keygen.ComplexKeyGenerator");
        hudiOptions.put(
                FlinkOptions.PAYLOAD_CLASS_NAME.key(),
                "org.apache.hudi.common.model.OverwriteWithLatestAvroPayload");
        hudiOptions.put(FlinkOptions.HIVE_STYLE_PARTITIONING.key(), "true");

        // hudiOptions.put(FlinkOptions.INDEX_TYPE.key(), HoodieIndex.IndexType.FLINK_STATE.name());
        hudiOptions.put(FlinkOptions.INDEX_TYPE.key(), HoodieIndex.IndexType.BUCKET.name());
        hudiOptions.put(
                FlinkOptions.BUCKET_INDEX_ENGINE_TYPE.key(),
                HoodieIndex.BucketIndexEngineType.CONSISTENT_HASHING.name());
        hudiOptions.put(
                FlinkOptions.BUCKET_INDEX_NUM_BUCKETS.key(), "10"); 


        // Cleaning Service
        hudiOptions.put(FlinkOptions.CLEAN_ASYNC_ENABLED.key(), "true");

        // KEEP_LATEST_FILE_VERSIONS: keeps the last N versions of the file slices written;
        // used when \"hoodie.clean.fileversions.retained\" is explicitly set only.
        // KEEP_LATEST_COMMITS: keeps the file slices written by the last N commits;
        // used when \"hoodie.clean.commits.retained\" is explicitly set only.
        // KEEP_LATEST_BY_HOURS: keeps the file slices written in the last N hours based on the
        // commit time;
        // used when \"hoodie.clean.hours.retained\" is explicitly set only.
        hudiOptions.put(
                FlinkOptions.CLEAN_POLICY.key(), HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS.name());
        hudiOptions.put(FlinkOptions.CLEAN_RETAIN_HOURS.key(), "24"); 
        // hudiOptions.put(FlinkOptions.CLEAN_RETAIN_COMMITS.key(), "480"); 

        hudiOptions.put(FlinkOptions.HIVE_SYNC_ENABLED.key(), "true");
        hudiOptions.put(FlinkOptions.HIVE_SYNC_TABLE.key(), targetTable);
        hudiOptions.put(FlinkOptions.HIVE_SYNC_DB.key(), targetDatabase);
        hudiOptions.put(FlinkOptions.HIVE_SYNC_MODE.key(), HiveSyncMode.HMS.name());
        // option for thrift is located in other file / env
        

        // clustering service
        hudiOptions.put(FlinkOptions.CLUSTERING_TASKS.key(), "10");
        hudiOptions.put(HoodieClusteringConfig.INLINE_CLUSTERING.key(), "true");
        hudiOptions.put(HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMITS.key(), "2"); 
        hudiOptions.put(
                FlinkOptions.CLUSTERING_PLAN_STRATEGY_CLASS.key(),
                HoodieClusteringConfig.FLINK_CONSISTENT_BUCKET_CLUSTERING_PLAN_STRATEGY);

        hudiOptions.put(
                FlinkOptions.CLUSTERING_PLAN_PARTITION_FILTER_MODE_NAME.key(), RECENT_DAYS.name());
        hudiOptions.put(
                FlinkOptions.CLUSTERING_PLAN_STRATEGY_SKIP_PARTITIONS_FROM_LATEST.key(), "0");
        hudiOptions.put(FlinkOptions.CLUSTERING_TARGET_PARTITIONS.key(), "3");
        hudiOptions.put(
                FlinkOptions.CLUSTERING_PLAN_STRATEGY_TARGET_FILE_MAX_BYTES.key(),
                String.valueOf(256 * 1024 * 1024L));
        hudiOptions.put(
                FlinkOptions.CLUSTERING_PLAN_STRATEGY_SMALL_FILE_LIMIT.key(),
                String.valueOf(180 * 1024 * 1024L));

        hudiOptions.put(FlinkOptions.WRITE_TASKS.key(), "10"); 
        // only for COW table
        // hudiOptions.put(FlinkOptions.INSERT_CLUSTER.key(), "true"); // merge small file
        // for insert mode
        hudiOptions.put(FlinkOptions.WRITE_PARQUET_MAX_FILE_SIZE.key(), "256");
        hudiOptions.put(
                FlinkOptions.WRITE_BATCH_SIZE.key(), "64"); 
        hudiOptions.put(FlinkOptions.WRITE_LOG_BLOCK_SIZE.key(), "128");
        hudiOptions.put(FlinkOptions.WRITE_LOG_MAX_SIZE.key(), "256");

        hudiOptions.put(TIMELINE_LAYOUT_VERSION_NUM.key(), "2");
        hudiOptions.put(ARCHIVE_MIN_COMMITS.key(), "480"); 
        hudiOptions.put(ARCHIVE_MAX_COMMITS.key(), "580");

Stacktrace

2025-06-16 17:38:15
org.apache.flink.util.FlinkException: Global failure triggered by OperatorCoordinator for 'Source: KAFKA (test-topic) -> Transform: RowData -> hoodie_append_write: default_database.table_06161722' (operator 59cdd8a1cddabde67ee500c4f33ff1ab).
	at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:651)
	at org.apache.hudi.sink.StreamWriteOperatorCoordinator.lambda$start$0(StreamWriteOperatorCoordinator.java:201)
	at org.apache.hudi.sink.utils.NonThrownExecutor.handleException(NonThrownExecutor.java:142)
	at org.apache.hudi.sink.utils.NonThrownExecutor.lambda$wrapAction$0(NonThrownExecutor.java:133)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.apache.hudi.exception.HoodieException: Executor executes action [commits the instant 20250616083812901] error
	... 6 more
Caused by: org.apache.hudi.exception.HoodieException: Error occurs when executing map
	at org.apache.hudi.common.function.FunctionWrapper.lambda$throwingMapWrapper$0(FunctionWrapper.java:40)
	at java.base/java.util.stream.ReferencePipeline$3$1.accept(Unknown Source)
	at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(Unknown Source)
	at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source)
	at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown Source)
	at java.base/java.util.stream.ReduceOps$ReduceTask.doLeaf(Unknown Source)
	at java.base/java.util.stream.ReduceOps$ReduceTask.doLeaf(Unknown Source)
	at java.base/java.util.stream.AbstractTask.compute(Unknown Source)
	at java.base/java.util.concurrent.CountedCompleter.exec(Unknown Source)
	at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
	at java.base/java.util.concurrent.ForkJoinTask.doInvoke(Unknown Source)
	at java.base/java.util.concurrent.ForkJoinTask.invoke(Unknown Source)
	at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateParallel(Unknown Source)
	at java.base/java.util.stream.AbstractPipeline.evaluate(Unknown Source)
	at java.base/java.util.stream.ReferencePipeline.collect(Unknown Source)
	at org.apache.hudi.client.common.HoodieFlinkEngineContext.map(HoodieFlinkEngineContext.java:114)
	at org.apache.hudi.table.action.cluster.strategy.PartitionAwareClusteringPlanStrategy.generateClusteringPlan(PartitionAwareClusteringPlanStrategy.java:168)
	at org.apache.hudi.table.action.cluster.ClusteringPlanActionExecutor.createClusteringPlan(ClusteringPlanActionExecutor.java:89)
	at org.apache.hudi.table.action.cluster.ClusteringPlanActionExecutor.execute(ClusteringPlanActionExecutor.java:94)
	at org.apache.hudi.table.HoodieFlinkCopyOnWriteTable.scheduleClustering(HoodieFlinkCopyOnWriteTable.java:332)
	at org.apache.hudi.client.BaseHoodieTableServiceClient.scheduleTableServiceInternal(BaseHoodieTableServiceClient.java:666)
	at org.apache.hudi.client.BaseHoodieTableServiceClient.scheduleTableService(BaseHoodieTableServiceClient.java:644)
	at org.apache.hudi.client.BaseHoodieTableServiceClient.scheduleClusteringAtInstant(BaseHoodieTableServiceClient.java:462)
	at org.apache.hudi.client.BaseHoodieTableServiceClient.scheduleClustering(BaseHoodieTableServiceClient.java:452)
	at org.apache.hudi.client.BaseHoodieTableServiceClient.inlineScheduleClustering(BaseHoodieTableServiceClient.java:745)
	at org.apache.hudi.client.BaseHoodieTableServiceClient.inlineClustering(BaseHoodieTableServiceClient.java:721)
	at org.apache.hudi.client.BaseHoodieTableServiceClient.inlineClustering(BaseHoodieTableServiceClient.java:734)
	at org.apache.hudi.client.BaseHoodieTableServiceClient.runTableServicesInline(BaseHoodieTableServiceClient.java:607)
	at org.apache.hudi.client.BaseHoodieWriteClient.runTableServicesInlineInternal(BaseHoodieWriteClient.java:603)
	at org.apache.hudi.client.BaseHoodieWriteClient.runTableServicesInline(BaseHoodieWriteClient.java:590)
	at org.apache.hudi.client.BaseHoodieWriteClient.commitStats(BaseHoodieWriteClient.java:259)
	at org.apache.hudi.client.HoodieFlinkWriteClient.commit(HoodieFlinkWriteClient.java:111)
	at org.apache.hudi.client.HoodieFlinkWriteClient.commit(HoodieFlinkWriteClient.java:74)
	at org.apache.hudi.client.BaseHoodieWriteClient.commit(BaseHoodieWriteClient.java:207)
	at org.apache.hudi.sink.StreamWriteOperatorCoordinator.doCommit(StreamWriteOperatorCoordinator.java:588)
	at org.apache.hudi.sink.StreamWriteOperatorCoordinator.commitInstant(StreamWriteOperatorCoordinator.java:564)
	at org.apache.hudi.sink.StreamWriteOperatorCoordinator.lambda$notifyCheckpointComplete$2(StreamWriteOperatorCoordinator.java:263)
	at org.apache.hudi.sink.utils.NonThrownExecutor.lambda$wrapAction$0(NonThrownExecutor.java:130)
	... 3 more
Caused by: java.lang.IllegalArgumentException: Metadata is empty for partition: log_datetime_id=2025-06-16+17
	at org.apache.hudi.common.util.ValidationUtils.checkArgument(ValidationUtils.java:42)
	at org.apache.hudi.table.action.cluster.strategy.BaseConsistentHashingBucketClusteringPlanStrategy.buildClusteringGroupsForPartition(BaseConsistentHashingBucketClusteringPlanStrategy.java:101)
	at org.apache.hudi.table.action.cluster.strategy.PartitionAwareClusteringPlanStrategy.lambda$generateClusteringPlan$d5a31cf4$1(PartitionAwareClusteringPlanStrategy.java:170)
	at org.apache.hudi.common.function.FunctionWrapper.lambda$throwingMapWrapper$0(FunctionWrapper.java:38)
	... 40 more

SML0127 avatar Jun 16 '25 08:06 SML0127

@SML0127 Flink does not support inline clustering but only async plan scheduling for consistent-hashing, you need to execute the plan in a separate job.

hudiOptions.put(HoodieClusteringConfig.INLINE_CLUSTERING.key(), "true");

danny0405 avatar Jun 16 '25 10:06 danny0405

@SML0127 Flink does not support inline clustering but only async plan scheduling for consistent-hashing, you need to execute the plan in a separate job.

hudiOptions.put(HoodieClusteringConfig.INLINE_CLUSTERING.key(), "true");

Hi, @danny0405 Thank you for kind answer. There are two more questions.

  1. Is there plan to support inline clustering at Flink or have any reasons for not supporting?
  2. Where and in what documentation can I find information that flink does not support inline clustering?

SML0127 avatar Jun 16 '25 14:06 SML0127

Flink never supprts any inline table services because the DAG of Flink operators are kind of deterministic during compile time. The case Flink does not support async clustering simultaniously with compaction is because of the same reason.

Only one async online rewrite table service is supported at a time now, either compaction or clustering.

danny0405 avatar Jun 17 '25 00:06 danny0405

Thank you @danny0405 From what I understand, auto compaction is also not supported in Flink with inline table service. Is that correct? (== We have to deploy another flink job for auto copmaction or clustering)

SML0127 avatar Jun 17 '25 08:06 SML0127

A "inline" table service indicates it is executed synchonously after each commit, instead, we do support async compaction and clustering in the ingestion job for both plan scheduling and execution.

danny0405 avatar Jun 17 '25 09:06 danny0405

Thank you @danny0405. I understand now after doing some more tests. One thing I'm curious about is that not only inline table service, but also when CLUSTERING_SCHEDULE_ENABLED is set to true, PartitionAwareClusteringPlanStrategy is called and the same issue occurs. Do you know anything about it?

issue: Caused by: java.lang.IllegalArgumentException: Metadata is empty for partition: log_datetime_id=2025-06-16+02

SML0127 avatar Jun 18 '25 07:06 SML0127

you need to disable the execution

danny0405 avatar Jun 18 '25 08:06 danny0405

Hi @SML0127

We understand Danny provided a solution for this issue. Please reply if you have any follow-up questions. Otherwise, we will automatically close this ticket in one week if we don't hear back.

rangareddy avatar Oct 30 '25 16:10 rangareddy

Closing this issue because the user has not responded to follow-up questions.

rangareddy avatar Nov 05 '25 13:11 rangareddy