hudi icon indicating copy to clipboard operation
hudi copied to clipboard

[SUPPORT] Executor executes action [commits the instant 20240202161708414] error

Open Toroidals opened this issue 1 year ago • 6 comments

Tips before filing an issue

  • Have you gone through our FAQs?

  • Join the mailing list to engage in conversations and get faster support at [email protected].

  • If you have triaged this as a bug, then file an issue directly.

Describe the problem you faced

When using two Flink programs to write to different partitions of the same table in Hudi, And the parameter has been set as: options.put(FlinkOptions.WRITE_CLIENT_ID.key(), String.valueOf(System.currentTimeMillis())); the following error occurred: image

2024-02-02 17:21:12 org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getGlobalFailureHandlingResult(ExecutionFailureHandler.java:101) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleGlobalFailure(DefaultScheduler.java:322) at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.lambda$failJob$0(OperatorCoordinatorHolder.java:574) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:443) at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:443) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at akka.actor.Actor.aroundReceive(Actor.scala:537) at akka.actor.Actor.aroundReceive$(Actor.scala:535) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) at akka.actor.ActorCell.invoke(ActorCell.scala:548) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) at akka.dispatch.Mailbox.run(Mailbox.scala:231) at akka.dispatch.Mailbox.exec(Mailbox.scala:243) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) Caused by: org.apache.flink.util.FlinkException: Global failure triggered by OperatorCoordinator for 'consistent_bucket_write: default_database.hudi_rbs_rbscmfprd_cmf_wf_operation_log_cdc_qy_test' (operator ab5eb0c735d351ddaa2e080f1564920d). at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:556) at org.apache.hudi.sink.StreamWriteOperatorCoordinator.lambda$start$0(StreamWriteOperatorCoordinator.java:196) at org.apache.hudi.sink.utils.NonThrownExecutor.handleException(NonThrownExecutor.java:142) at org.apache.hudi.sink.utils.NonThrownExecutor.lambda$wrapAction$0(NonThrownExecutor.java:133) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.hudi.exception.HoodieException: Executor executes action [commits the instant 20240202171450091] error ... 6 more Caused by: java.lang.IllegalArgumentException at org.apache.hudi.common.util.ValidationUtils.checkArgument(ValidationUtils.java:33) at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionState(HoodieActiveTimeline.java:616) at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionState(HoodieActiveTimeline.java:597) at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.saveAsComplete(HoodieActiveTimeline.java:223) at org.apache.hudi.client.BaseHoodieWriteClient.commit(BaseHoodieWriteClient.java:286) at org.apache.hudi.client.BaseHoodieWriteClient.commitStats(BaseHoodieWriteClient.java:236) at org.apache.hudi.client.HoodieFlinkWriteClient.commit(HoodieFlinkWriteClient.java:112) at org.apache.hudi.client.HoodieFlinkWriteClient.commit(HoodieFlinkWriteClient.java:75) at org.apache.hudi.client.BaseHoodieWriteClient.commit(BaseHoodieWriteClient.java:201) at org.apache.hudi.sink.StreamWriteOperatorCoordinator.doCommit(StreamWriteOperatorCoordinator.java:564) at org.apache.hudi.sink.StreamWriteOperatorCoordinator.commitInstant(StreamWriteOperatorCoordinator.java:540) at org.apache.hudi.sink.StreamWriteOperatorCoordinator.lambda$notifyCheckpointComplete$2(StreamWriteOperatorCoordinator.java:258) at org.apache.hudi.sink.utils.NonThrownExecutor.lambda$wrapAction$0(NonThrownExecutor.java:130) ... 3 more

To Reproduce

public static HoodiePipeline.Builder getHoodieBuilder(HashMap<String, String> infoMap, HashMap<String, String> connectInfo) {

    HoodiePipeline.Builder builder = HoodiePipeline.builder(infoMap.get("hudi_table_name"));

    String hudiFieldMap = infoMap.get("hudi_field_map").toLowerCase(Locale.ROOT);
    ArrayList<ArrayList<String>> fieldList = JSON.parseObject(hudiFieldMap, new TypeReference<ArrayList<ArrayList<String>>>() {
    });
    for (ArrayList<String> columnList : fieldList) {
        builder.column("`" + columnList.get(0) + "` " + columnList.get(1));
    }
    builder.column("_flink_cdc_connector string");
    builder.column("_flink_cdc_db string");
    builder.column("_flink_cdc_table string");
    builder.column("_flink_cdc_op string");
    builder.column("_flink_cdc_ts_ms timestamp");

    builder.pk(infoMap.get("hudi_primary_key"));

    Map<String, String> options = new HashMap<>();
    options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name());

    options.put(FlinkOptions.DATABASE_NAME.key(), infoMap.get("hudi_database_name"));
    options.put(FlinkOptions.TABLE_NAME.key(), infoMap.get("hudi_table_name"));

    options.put(FlinkOptions.PRE_COMBINE.key(), "true");
    options.put(FlinkOptions.PRECOMBINE_FIELD.key(), infoMap.get("hudi_precombine_field"));

    options.put(FlinkOptions.INDEX_BOOTSTRAP_ENABLED.key(), "true");
    options.put(FlinkOptions.INDEX_TYPE.key(), HoodieIndex.IndexType.BUCKET.name());
	options.put(FlinkOptions.WRITE_TASKS.key(), writeTasks);

	//bucket assigner
    options.put(FlinkOptions.BUCKET_ASSIGN_TASKS.key(), bucketAssignTasks);
    options.put(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS.key(), infoMap.get("hudi_bucket_index_num_buckets"));
    options.put(FlinkOptions.BUCKET_INDEX_ENGINE_TYPE.key(), infoMap.get("hudi_bucket_index_engine_type"));

    //COMPACTION
    options.put(FlinkOptions.COMPACTION_TRIGGER_STRATEGY.key(), infoMap.get("hudi_compaction_trigger_strategy"));
    options.put(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), infoMap.get("hudi_compaction_delta_commits"));
    options.put(FlinkOptions.COMPACTION_DELTA_SECONDS.key(), infoMap.get("hudi_compaction_delta_seconds"));

	options.put(FlinkOptions.PARTITION_PATH_FIELD.key(),  "_flink_cdc_table");

    //HIVE_SYNC
    options.put(FlinkOptions.HIVE_SYNC_ENABLED.key(), infoMap.get("hudi_hive_sync_enabled"));
    options.put(FlinkOptions.HIVE_SYNC_MODE.key(), infoMap.get("hudi_hive_sync_mode"));
    options.put(FlinkOptions.HIVE_SYNC_DB.key(), infoMap.get("hudi_hive_sync_db"));
    options.put(FlinkOptions.HIVE_SYNC_TABLE.key(), infoMap.get("hudi_hive_sync_table"));
    options.put(FlinkOptions.HIVE_SYNC_PARTITION_FIELDS.key(),  "_flink_cdc_table");
    options.put(FlinkOptions.HIVE_SYNC_CONF_DIR.key(), "/etc/hive/conf");
    options.put(FlinkOptions.HIVE_SYNC_METASTORE_URIS.key(), connectInfo.get("hive_metastore_url"));
    options.put(FlinkOptions.HIVE_SYNC_JDBC_URL.key(), connectInfo.get("conn_url"));
    options.put(FlinkOptions.HIVE_SYNC_SUPPORT_TIMESTAMP.key(), "true");
    options.put(FlinkOptions.HIVE_SYNC_SKIP_RO_SUFFIX.key(), "true");

	
	**options.put(FlinkOptions.WRITE_CLIENT_ID.key(),  String.valueOf(System.currentTimeMillis()));**

    builder.options(options);
    return builder;
}

Expected behavior

After setting the parameter options.put(FlinkOptions.WRITE_CLIENT_ID.key(), String.valueOf(System.currentTimeMillis()));, ensure that two Flink programs can write to different partitions of the same Hudi table properly.

Environment Description

  • Hudi version : 0.14.0

  • Flink version : 1.15.2

  • Hive version : 3.x

  • Hadoop version : 3.x

  • Storage (HDFS/S3/GCS..) : HDFS

  • Running on Docker? (yes/no) : no

Additional context

Add any other context about the problem here.

Stacktrace

Add the stacktrace of the error.

Toroidals avatar Feb 02 '24 09:02 Toroidals

提交问题前的提示

描述您面临的问题

当使用两个 Flink 程序写入 Hudi 中同一张表的不同分区时,参数已设置为:options.put(FlinkOptions.WRITE_CLIENT_ID.key(), String.valueOf(System.currentTimeMillis()));出现以下错误: image

2024-02-02 17:21:12 org.apache.flink.runtime.JobException:在 org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138) 在 org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getGlobalFailureHandlingResult(ExecutionFailureHandler.java:101)在 org.apache.flink.runtime.scheduler.DefaultScheduler.handleGlobalFailure(DefaultScheduler.java:322) 在 org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.lambda$failJob$0(OperatorCoordinatorHolder.java:574) 在 org.apache.flink.runtime.rpc.akka.Akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:443)在 org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) 在 org.apache.flink.runtime.rpc.akka.Akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:443) 在 org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213) 在 org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)在 org.apache.flink.runtime.rpc.akka.Akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163) 在 akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) 在 akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) 在 scala。PartialFunction.applyOrElse(PartialFunction.scala:123) 在 scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) 在 akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) 在 scala。PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 在 scala。PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 在 scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 在 akka.actor.Actor.aroundReceive(Actor.scala:537) 在 akka.actor.Actor.aroundReceive$(Actor.scala:535) 在 akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) 在 akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) 在 akka.actor.ActorCell.invoke(ActorCell.scala:548) 在 akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) 在 akka.dispatch.Mailbox.run(Mailbox.scala:231) 在 akka.dispatch.Mailbox.exec(Mailbox.scala:243) 在 java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) 在 java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) 在 java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) 在 java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) 由以下原因引起:org.apache.flink.util.FlinkException:OperatorCoordinator 触发的“consistent_bucket_write:default_database.hudi_rbs_rbscmfprd_cmf_wf_operation_log_cdc_qy_test”(运算符 ab5eb0c735d351ddaa2e080f1564920d)的全局故障。在 org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:556) 在 org.apache.hudi.sink.StreamWriteOperatorCoordinator.lambda$start$0(StreamWriteOperatorCoordinator.java:196) 在 org.apache.hudi.sink.utils.NonThrownExecutor.handleException(NonThrownExecutor.java:142) 在 org.apache.hudi.sink.utils.NonThrownExecutor.lambda$wrapAction$0(NonThrownExecutor.java:133) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 在 java.lang.Thread.run(Thread.java:748)原因:org.apache.hudi.exception.HoodieException:执行器执行操作[提交即时20240202171450091]错误...另外 6 个原因: java.lang.IllegalArgumentException at org.apache.hudi.common.util.ValidationUtils.checkArgument(ValidationUtils.java:33) at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionState(HoodieActiveTimeline.java:616) at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionState(HoodieActiveTimeline.java:597)在 org.apache.hudi.common.table.timeline.HoodieActiveTimeline.saveAsComplete(HoodieActiveTimeline.java:223) 在 org.apache.hudi.client.BaseHoodieWriteClient.commit(BaseHoodieWriteClient.java:286) 在 org.apache.hudi.client.BaseHoodieWriteClient.commitStats(BaseHoodieWriteClient.java:236) 在 org.apache.hudi.client.HoodieFlinkWriteClient.commit(HoodieFlinkWriteClient.java:112)在 org.apache.hudi.client.HoodieFlinkWriteClient.commit(HoodieFlinkWriteClient.java:75) 在 org.apache.hudi.client.BaseHoodieWriteClient.commit(BaseHoodieWriteClient.java:201) 在 org.apache.hudi.sink.StreamWriteOperatorCoordinator.doCommit(StreamWriteOperatorCoordinator.java:564) 在 org.apache.hudi.sink.StreamWriteOperatorCoordinator.commitInstant(StreamWriteOperatorCoordinator.java:540)在 org.apache.hudi.sink.StreamWriteOperatorCoordinator.lambda$notifyCheckpointComplete$2(StreamWriteOperatorCoordinator.java:258) 在 org.apache.hudi.sink.utils.NonThrownExecutor.lambda$wrapAction$0(NonThrownExecutor.java:130) ...另外 3 个

To Reproduce

public static HoodiePipeline.Builder getHoodieBuilder(HashMap<String, String> infoMap, HashMap<String, String> connectInfo) {

    HoodiePipeline.Builder builder = HoodiePipeline.builder(infoMap.get("hudi_table_name"));

    String hudiFieldMap = infoMap.get("hudi_field_map").toLowerCase(Locale.ROOT);
    ArrayList<ArrayList<String>> fieldList = JSON.parseObject(hudiFieldMap, new TypeReference<ArrayList<ArrayList<String>>>() {
    });
    for (ArrayList<String> columnList : fieldList) {
        builder.column("`" + columnList.get(0) + "` " + columnList.get(1));
    }
    builder.column("_flink_cdc_connector string");
    builder.column("_flink_cdc_db string");
    builder.column("_flink_cdc_table string");
    builder.column("_flink_cdc_op string");
    builder.column("_flink_cdc_ts_ms timestamp");

    builder.pk(infoMap.get("hudi_primary_key"));

    Map<String, String> options = new HashMap<>();
    options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name());

    options.put(FlinkOptions.DATABASE_NAME.key(), infoMap.get("hudi_database_name"));
    options.put(FlinkOptions.TABLE_NAME.key(), infoMap.get("hudi_table_name"));

    options.put(FlinkOptions.PRE_COMBINE.key(), "true");
    options.put(FlinkOptions.PRECOMBINE_FIELD.key(), infoMap.get("hudi_precombine_field"));

    options.put(FlinkOptions.INDEX_BOOTSTRAP_ENABLED.key(), "true");
    options.put(FlinkOptions.INDEX_TYPE.key(), HoodieIndex.IndexType.BUCKET.name());
	options.put(FlinkOptions.WRITE_TASKS.key(), writeTasks);

	//bucket assigner
    options.put(FlinkOptions.BUCKET_ASSIGN_TASKS.key(), bucketAssignTasks);
    options.put(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS.key(), infoMap.get("hudi_bucket_index_num_buckets"));
    options.put(FlinkOptions.BUCKET_INDEX_ENGINE_TYPE.key(), infoMap.get("hudi_bucket_index_engine_type"));

    //COMPACTION
    options.put(FlinkOptions.COMPACTION_TRIGGER_STRATEGY.key(), infoMap.get("hudi_compaction_trigger_strategy"));
    options.put(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), infoMap.get("hudi_compaction_delta_commits"));
    options.put(FlinkOptions.COMPACTION_DELTA_SECONDS.key(), infoMap.get("hudi_compaction_delta_seconds"));

	options.put(FlinkOptions.PARTITION_PATH_FIELD.key(),  "_flink_cdc_table");

    //HIVE_SYNC
    options.put(FlinkOptions.HIVE_SYNC_ENABLED.key(), infoMap.get("hudi_hive_sync_enabled"));
    options.put(FlinkOptions.HIVE_SYNC_MODE.key(), infoMap.get("hudi_hive_sync_mode"));
    options.put(FlinkOptions.HIVE_SYNC_DB.key(), infoMap.get("hudi_hive_sync_db"));
    options.put(FlinkOptions.HIVE_SYNC_TABLE.key(), infoMap.get("hudi_hive_sync_table"));
    options.put(FlinkOptions.HIVE_SYNC_PARTITION_FIELDS.key(),  "_flink_cdc_table");
    options.put(FlinkOptions.HIVE_SYNC_CONF_DIR.key(), "/etc/hive/conf");
    options.put(FlinkOptions.HIVE_SYNC_METASTORE_URIS.key(), connectInfo.get("hive_metastore_url"));
    options.put(FlinkOptions.HIVE_SYNC_JDBC_URL.key(), connectInfo.get("conn_url"));
    options.put(FlinkOptions.HIVE_SYNC_SUPPORT_TIMESTAMP.key(), "true");
    options.put(FlinkOptions.HIVE_SYNC_SKIP_RO_SUFFIX.key(), "true");

	
	**options.put(FlinkOptions.WRITE_CLIENT_ID.key(),  String.valueOf(System.currentTimeMillis()));**

    builder.options(options);
    return builder;
}

Expected behavior

After setting the parameter options.put(FlinkOptions.WRITE_CLIENT_ID.key(), String.valueOf(System.currentTimeMillis()));, ensure that two Flink programs can write to different partitions of the same Hudi table properly.

Environment Description

  • Hudi version : 0.14.0
  • Flink version : 1.15.2
  • Hive version : 3.x
  • Hadoop version : 3.x
  • Storage (HDFS/S3/GCS..) : HDFS
  • Running on Docker? (yes/no) : no

Additional context

Add any other context about the problem here.

Stacktrace

Add the stacktrace of the error. image Using two Flink programs to write to different partitions of the same table in Hudi, only one task will consistently encounter an error.

Toroidals avatar Feb 02 '24 09:02 Toroidals

Only 1.0 release supports concurrent streaming writers.

danny0405 avatar Feb 03 '24 02:02 danny0405

Only 1.0 release supports concurrent streaming writers.

If the Flink checkpoint is closed, the writing can be done normally, but when the checkpoint is on, there will be an error. The requirement of writing to mor table by Flink is that the checkpoint must be on.

public static void main(String[] args) throws Exception {

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    env.enableCheckpointing(TimeUnit.SECONDS.toMillis(Long.parseLong(confInfo.get("checkpoint_interval"))), CheckpointingMode.EXACTLY_ONCE);
    env.getCheckpointConfig().setCheckpointTimeout(TimeUnit.SECONDS.toMillis(Long.parseLong(confInfo.get("checkpoint_timeout"))));
    env.getCheckpointConfig().setMaxConcurrentCheckpoints(Integer.parseInt(confInfo.get("checkpoint_max_concurrent")));
    env.getCheckpointConfig().setCheckpointStorage(confInfo.get("checkpoint_path"));
    env.getCheckpointConfig().setTolerableCheckpointFailureNumber(Integer.parseInt(confInfo.get("checkpoint_failure_number")));
    env.setRestartStrategy(RestartStrategies.noRestart());

    EmbeddedRocksDBStateBackend embeddedRocksDBStateBackend = new EmbeddedRocksDBStateBackend(true);
    embeddedRocksDBStateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM);
    env.setStateBackend(embeddedRocksDBStateBackend);

    DataStreamSource<String> dataStreamSource = env.addSource();

    HoodiePipeline.Builder builder = HoodiePipeline.builder(infoMap.get("hudi_table_name"));

    Map<String, String> options = new HashMap<>();
    String hudiFieldMap = infoMap.get("hudi_field_map").toLowerCase(Locale.ROOT);
    ArrayList<ArrayList<String>> fieldList = JSON.parseObject(hudiFieldMap, new TypeReference<ArrayList<ArrayList<String>>>() {
    });
    for (ArrayList<String> columnList : fieldList) {
        builder.column("`" + columnList.get(0) + "` " + columnList.get(1));
    }
    builder.column("_flink_cdc_connector string");
    builder.column("_flink_cdc_db string");
    builder.column("_flink_cdc_table string");
    builder.column("_flink_cdc_op string");
    builder.column("_flink_cdc_ts_ms timestamp");

    builder.pk(infoMap.get("hudi_primary_key"));

    options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name());
    options.put(FlinkOptions.DATABASE_NAME.key(), infoMap.get("hudi_database_name"));
    options.put(FlinkOptions.TABLE_NAME.key(), infoMap.get("hudi_table_name"));

    options.put(FlinkOptions.PRE_COMBINE.key(), "true");
    options.put(FlinkOptions.PRECOMBINE_FIELD.key(), infoMap.get("hudi_precombine_field"));

    options.put(FlinkOptions.INDEX_BOOTSTRAP_ENABLED.key(), "true");
    options.put(FlinkOptions.INDEX_TYPE.key(), HoodieIndex.IndexType.BUCKET.name());

    options.put(FlinkOptions.WRITE_TASKS.key(), infoMap.get("hudi_write_tasks"));

    options.put(FlinkOptions.BUCKET_ASSIGN_TASKS.key(), bucketAssignTasks);

    options.put(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS.key(), infoMap.get("hudi_bucket_index_num_buckets"));
    options.put(FlinkOptions.BUCKET_INDEX_ENGINE_TYPE.key(), infoMap.get("hudi_bucket_index_engine_type"));


    options.put(FlinkOptions.COMPACTION_TASKS.key(), compactionTasks);
    options.put(FlinkOptions.COMPACTION_TRIGGER_STRATEGY.key(), infoMap.get("hudi_compaction_trigger_strategy"));
    options.put(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), infoMap.get("hudi_compaction_delta_commits"));
    options.put(FlinkOptions.COMPACTION_DELTA_SECONDS.key(), infoMap.get("hudi_compaction_delta_seconds"));


    options.put(FlinkOptions.HIVE_SYNC_ENABLED.key(), infoMap.get("hudi_hive_sync_enabled"));
    options.put(FlinkOptions.HIVE_SYNC_MODE.key(), infoMap.get("hudi_hive_sync_mode"));
    options.put(FlinkOptions.HIVE_SYNC_DB.key(), infoMap.get("hudi_hive_sync_db"));
    options.put(FlinkOptions.HIVE_SYNC_TABLE.key(), infoMap.get("hudi_hive_sync_table"));

    options.put(FlinkOptions.PARTITION_PATH_FIELD.key(), "_flink_cdc_table");
    options.put(FlinkOptions.HIVE_SYNC_PARTITION_FIELDS.key(), "_flink_cdc_table");


    options.put(FlinkOptions.HIVE_SYNC_CONF_DIR.key(), "/etc/hive/conf");
    options.put(FlinkOptions.HIVE_SYNC_METASTORE_URIS.key(), connectInfo.get("hive_metastore_url"));
    options.put(FlinkOptions.HIVE_SYNC_JDBC_URL.key(), connectInfo.get("conn_url"));
    options.put(FlinkOptions.HIVE_SYNC_SUPPORT_TIMESTAMP.key(), "true");
    options.put(FlinkOptions.HIVE_SYNC_SKIP_RO_SUFFIX.key(), "true");

    options.put(FlinkOptions.PARTITION_PATH_FIELD.key(), infoMap.get("hudi_hive_sync_partition_fields"));
    options.put(FlinkOptions.HIVE_SYNC_PARTITION_FIELDS.key(), infoMap.get("hudi_hive_sync_partition_fields"));

    options.put(FlinkOptions.WRITE_RATE_LIMIT.key(), infoMap.get("hudi_write_rate_limit"));

    options.put(FlinkOptions.WRITE_CLIENT_ID.key(), String.valueOf(System.currentTimeMillis()));

    builder.options(options);

    builder.sink(dataStreamSource, false);
    env.execute("kafka-to-hudi");
}

Toroidals avatar Feb 04 '24 07:02 Toroidals

The checkpoint must be enabled, we need to ensure the exactly semantics.

danny0405 avatar Feb 04 '24 23:02 danny0405

The checkpoint must be enabled, we need to ensure the exactly semantics.

However, enabling checkpoint will result in an error message: Executor execute action [commit the instant 20240202161708414] error

Toroidals avatar Feb 05 '24 02:02 Toroidals

Can you show us the error stack trace?

danny0405 avatar Feb 06 '24 03:02 danny0405