hudi
hudi copied to clipboard
[SUPPORT] Multiple flink jobs in same Flink Sesssion cluster
Tips before filing an issue
-
Have you gone through our FAQs?
-
Join the mailing list to engage in conversations and get faster support at [email protected].
-
If you have triaged this as a bug, then file an issue directly.
Describe the problem you faced
A clear and concise description of the problem.
To Reproduce
Steps to reproduce the behavior:
- deploy multiple flink jobs that ingest message to hudi table
- occur under error
Caused by: org.apache.hadoop.metrics2.MetricsException: Metrics source RegionServer,sub=IO already exists!
at org.apache.hadoop.metrics2.lib.DefaultMetricsSystem.newSourceName(DefaultMetricsSystem.java:144) ~[hadoop-common-2.10.2-khp-p9.jar:?]
at org.apache.hadoop.metrics2.lib.DefaultMetricsSystem.sourceName(DefaultMetricsSystem.java:117) ~[hadoop-common-2.10.2-khp-p9.jar:?]
at org.apache.hadoop.metrics2.impl.MetricsSystemImpl.register(MetricsSystemImpl.java:229) ~[hadoop-common-2.10.2-khp-p9.jar:?]
at org.apache.hudi.org.apache.hadoop.hbase.metrics.BaseSourceImpl.<init>(BaseSourceImpl.java:114) ~[?:?]
at org.apache.hudi.org.apache.hadoop.hbase.io.MetricsIOSourceImpl.<init>(MetricsIOSourceImpl.java:42) ~[?:?]
at org.apache.hudi.org.apache.hadoop.hbase.io.MetricsIOSourceImpl.<init>(MetricsIOSourceImpl.java:37) ~[?:?]
at org.apache.hudi.org.apache.hadoop.hbase.regionserver.MetricsRegionServerSourceFactoryImpl.createIO(MetricsRegionServerSourceFactoryImpl.java:95) ~[?:?]
at org.apache.hudi.org.apache.hadoop.hbase.io.MetricsIO.<init>(MetricsIO.java:32) ~[?:?]
at org.apache.hudi.org.apache.hadoop.hbase.io.hfile.HFile.<clinit>(HFile.java:172) ~[?:?]
at org.apache.hudi.io.hadoop.HoodieAvroHFileWriter.<init>(HoodieAvroHFileWriter.java:115) ~[?:?]
at org.apache.hudi.io.hadoop.HoodieAvroFileWriterFactory.newHFileFileWriter(HoodieAvroFileWriterFactory.java:108) ~[?:?]
at org.apache.hudi.io.storage.HoodieFileWriterFactory.getFileWriterByFormat(HoodieFileWriterFactory.java:70) ~[?:?]
at org.apache.hudi.io.storage.HoodieFileWriterFactory.getFileWriter(HoodieFileWriterFactory.java:53) ~[?:?]
at org.apache.hudi.io.HoodieCreateHandle.<init>(HoodieCreateHandle.java:106) ~[?:?]
at org.apache.hudi.io.HoodieCreateHandle.<init>(HoodieCreateHandle.java:82) ~[?:?]
at org.apache.hudi.io.FlinkCreateHandle.<init>(FlinkCreateHandle.java:66) ~[?:?]
at org.apache.hudi.io.FlinkCreateHandle.<init>(FlinkCreateHandle.java:59) ~[?:?]
at org.apache.hudi.io.FlinkWriteHandleFactory$BaseCommitWriteHandleFactory.create(FlinkWriteHandleFactory.java:121) ~[?:?]
at org.apache.hudi.client.HoodieFlinkWriteClient.getOrCreateWriteHandle(HoodieFlinkWriteClient.java:459) ~[?:?]
at org.apache.hudi.client.HoodieFlinkWriteClient.access$000(HoodieFlinkWriteClient.java:75) ~[?:?]
at org.apache.hudi.client.HoodieFlinkWriteClient$AutoCloseableWriteHandle.<init>(HoodieFlinkWriteClient.java:515) ~[?:?]
at org.apache.hudi.client.HoodieFlinkWriteClient.lambda$bulkInsertPreppedRecords$6(HoodieFlinkWriteClient.java:259) ~[?:?]
at java.util.stream.ReferencePipeline$3$1.accept(Unknown Source) ~[?:?]
at java.util.HashMap$ValueSpliterator.forEachRemaining(Unknown Source) ~[?:?]
at java.util.stream.AbstractPipeline.copyInto(Unknown Source) ~[?:?]
at java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown Source) ~[?:?]
at java.util.stream.ReduceOps$ReduceTask.doLeaf(Unknown Source) ~[?:?]
at java.util.stream.ReduceOps$ReduceTask.doLeaf(Unknown Source) ~[?:?]
at java.util.stream.AbstractTask.compute(Unknown Source) ~[?:?]
at java.util.concurrent.CountedCompleter.exec(Unknown Source) ~[?:?]
at java.util.concurrent.ForkJoinTask.doExec(Unknown Source) ~[?:?]
at java.util.concurrent.ForkJoinTask.doInvoke(Unknown Source) ~[?:?]
at java.util.concurrent.ForkJoinTask.invoke(Unknown Source) ~[?:?]
at java.util.stream.ReduceOps$ReduceOp.evaluateParallel(Unknown Source) ~[?:?]
at java.util.stream.AbstractPipeline.evaluate(Unknown Source) ~[?:?]
at java.util.stream.ReferencePipeline.collect(Unknown Source) ~[?:?]
at org.apache.hudi.client.HoodieFlinkWriteClient.bulkInsertPreppedRecords(HoodieFlinkWriteClient.java:263) ~[?:?]
at org.apache.hudi.client.HoodieFlinkWriteClient.bulkInsertPreppedRecords(HoodieFlinkWriteClient.java:74) ~[?:?]
at org.apache.hudi.metadata.FlinkHoodieBackedTableMetadataWriter.commitInternal(FlinkHoodieBackedTableMetadataWriter.java:166) ~[?:?]
at org.apache.hudi.metadata.FlinkHoodieBackedTableMetadataWriter.bulkCommit(FlinkHoodieBackedTableMetadataWriter.java:118) ~[?:?]
at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.initializeFromFilesystem(HoodieBackedTableMetadataWriter.java:503) ~[?:?]
at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.initializeIfNeeded(HoodieBackedTableMetadataWriter.java:283) ~[?:?]
at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.<init>(HoodieBackedTableMetadataWriter.java:188) ~[?:?]
at org.apache.hudi.metadata.FlinkHoodieBackedTableMetadataWriter.<init>(FlinkHoodieBackedTableMetadataWriter.java:87) ~[?:?]
at org.apache.hudi.metadata.FlinkHoodieBackedTableMetadataWriter.create(FlinkHoodieBackedTableMetadataWriter.java:69) ~[?:?]
at org.apache.hudi.client.HoodieFlinkTableServiceClient.initMetadataWriter(HoodieFlinkTableServiceClient.java:183) ~[?:?]
at org.apache.hudi.client.HoodieFlinkTableServiceClient.initMetadataTable(HoodieFlinkTableServiceClient.java:196) ~[?:?]
at org.apache.hudi.client.HoodieFlinkWriteClient.initMetadataTable(HoodieFlinkWriteClient.java:317) ~[?:?]
at org.apache.hudi.sink.StreamWriteOperatorCoordinator.initMetadataTable(StreamWriteOperatorCoordinator.java:352) ~[?:?]
at org.apache.hudi.sink.StreamWriteOperatorCoordinator.start(StreamWriteOperatorCoordinator.java:197) ~[?:?]
at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.start(OperatorCoordinatorHolder.java:204) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:173) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startAllOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:85) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.runtime.scheduler.SchedulerBase.startScheduling(SchedulerBase.java:635) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.runtime.jobmaster.JobMaster.startScheduling(JobMaster.java:1220) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.runtime.jobmaster.JobMaster.startJobExecution(JobMaster.java:1137) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.runtime.jobmaster.JobMaster.onStart(JobMaster.java:460) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStart(RpcEndpoint.java:214) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor$StoppedState.lambda$start$0(PekkoRpcActor.java:627) ~[flink-rpc-akka96a41d36-4f80-4a91-bda1-2d6c943ad3ba.jar:1.20.0]
at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor$StoppedState.start(PekkoRpcActor.java:626) ~[flink-rpc-akka96a41d36-4f80-4a91-bda1-2d6c943ad3ba.jar:1.20.0]
at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleControlMessage(PekkoRpcActor.java:197) ~[flink-rpc-akka96a41d36-4f80-4a91-bda1-2d6c943ad3ba.jar:1.20.0]
at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33) ~[?:?]
at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29) ~[?:?]
at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) ~[flink-scala_2.12-1.20.0.jar:1.20.0]
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) ~[flink-scala_2.12-1.20.0.jar:1.20.0]
at org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29) ~[?:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) ~[flink-scala_2.12-1.20.0.jar:1.20.0]
... 14 more
Expected behavior
A clear and concise description of what you expected to happen.
Environment Description
-
Hudi version : 1.0.2
-
Flink version : 1.20.0
-
Hive version : 2.3.7
-
Hadoop version : 2.10.2
-
Storage (HDFS/S3/GCS..) : HDFS
-
Running on Docker? (yes/no) : yes
Additional context my options for metric
options.put(HoodieMetadataConfig.METRICS_ENABLE.key(), "false");
options.put(HoodieMetricsConfig.TURN_METRICS_ON.key(), "false");
Stacktrace
full stack trace
org.apache.flink.util.FlinkException: JobMaster for job 72cdee936b850791bd04c629e18d758e failed.
at org.apache.flink.runtime.dispatcher.Dispatcher.jobMasterFailed(Dispatcher.java:1492) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.runtime.dispatcher.Dispatcher.jobManagerRunnerFailed(Dispatcher.java:782) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$runJob$7(Dispatcher.java:704) ~[flink-dist-1.20.0.jar:1.20.0]
at java.util.concurrent.CompletableFuture.uniHandle(Unknown Source) ~[?:?]
at java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown Source) ~[?:?]
at java.util.concurrent.CompletableFuture$Completion.run(Unknown Source) ~[?:?]
at org.apache.flink.util.MdcUtils.lambda$wrapRunnable$1(MdcUtils.java:64) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRunAsync$4(PekkoRpcActor.java:460) ~[flink-rpc-akka96a41d36-4f80-4a91-bda1-2d6c943ad3ba.jar:1.20.0]
at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRunAsync(PekkoRpcActor.java:460) ~[flink-rpc-akka96a41d36-4f80-4a91-bda1-2d6c943ad3ba.jar:1.20.0]
at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:225) ~[flink-rpc-akka96a41d36-4f80-4a91-bda1-2d6c943ad3ba.jar:1.20.0]
at org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:88) ~[flink-rpc-akka96a41d36-4f80-4a91-bda1-2d6c943ad3ba.jar:1.20.0]
at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:174) ~[flink-rpc-akka96a41d36-4f80-4a91-bda1-2d6c943ad3ba.jar:1.20.0]
at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33) [flink-rpc-akka96a41d36-4f80-4a91-bda1-2d6c943ad3ba.jar:1.20.0]
at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29) [flink-rpc-akka96a41d36-4f80-4a91-bda1-2d6c943ad3ba.jar:1.20.0]
at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) [flink-rpc-akka96a41d36-4f80-4a91-bda1-2d6c943ad3ba.jar:1.20.0]
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) [flink-rpc-akka96a41d36-4f80-4a91-bda1-2d6c943ad3ba.jar:1.20.0]
at org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29) [flink-rpc-akka96a41d36-4f80-4a91-bda1-2d6c943ad3ba.jar:1.20.0]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) [flink-rpc-akka96a41d36-4f80-4a91-bda1-2d6c943ad3ba.jar:1.20.0]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) [flink-rpc-akka96a41d36-4f80-4a91-bda1-2d6c943ad3ba.jar:1.20.0]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) [flink-rpc-akka96a41d36-4f80-4a91-bda1-2d6c943ad3ba.jar:1.20.0]
at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547) [flink-rpc-akka96a41d36-4f80-4a91-bda1-2d6c943ad3ba.jar:1.20.0]
at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545) [flink-rpc-akka96a41d36-4f80-4a91-bda1-2d6c943ad3ba.jar:1.20.0]
at org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229) [flink-rpc-akka96a41d36-4f80-4a91-bda1-2d6c943ad3ba.jar:1.20.0]
at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590) [flink-rpc-akka96a41d36-4f80-4a91-bda1-2d6c943ad3ba.jar:1.20.0]
at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557) [flink-rpc-akka96a41d36-4f80-4a91-bda1-2d6c943ad3ba.jar:1.20.0]
at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280) [flink-rpc-akka96a41d36-4f80-4a91-bda1-2d6c943ad3ba.jar:1.20.0]
at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241) [flink-rpc-akka96a41d36-4f80-4a91-bda1-2d6c943ad3ba.jar:1.20.0]
at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253) [flink-rpc-akka96a41d36-4f80-4a91-bda1-2d6c943ad3ba.jar:1.20.0]
at java.util.concurrent.ForkJoinTask.doExec(Unknown Source) [?:?]
at java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source) [?:?]
at java.util.concurrent.ForkJoinPool.scan(Unknown Source) [?:?]
at java.util.concurrent.ForkJoinPool.runWorker(Unknown Source) [?:?]
at java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source) [?:?]
Caused by: org.apache.flink.runtime.jobmaster.JobMasterException: Could not start the JobMaster.
at org.apache.flink.runtime.jobmaster.JobMaster.onStart(JobMaster.java:462) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStart(RpcEndpoint.java:214) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor$StoppedState.lambda$start$0(PekkoRpcActor.java:627) ~[flink-rpc-akka96a41d36-4f80-4a91-bda1-2d6c943ad3ba.jar:1.20.0]
at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor$StoppedState.start(PekkoRpcActor.java:626) ~[flink-rpc-akka96a41d36-4f80-4a91-bda1-2d6c943ad3ba.jar:1.20.0]
at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleControlMessage(PekkoRpcActor.java:197) ~[flink-rpc-akka96a41d36-4f80-4a91-bda1-2d6c943ad3ba.jar:1.20.0]
at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33) ~[?:?]
at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29) ~[?:?]
at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) ~[flink-scala_2.12-1.20.0.jar:1.20.0]
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) ~[flink-scala_2.12-1.20.0.jar:1.20.0]
at org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29) ~[?:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) ~[flink-scala_2.12-1.20.0.jar:1.20.0]
... 14 more
Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to start the operator coordinators
at org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:178) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startAllOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:85) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.runtime.scheduler.SchedulerBase.startScheduling(SchedulerBase.java:635) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.runtime.jobmaster.JobMaster.startScheduling(JobMaster.java:1220) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.runtime.jobmaster.JobMaster.startJobExecution(JobMaster.java:1137) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.runtime.jobmaster.JobMaster.onStart(JobMaster.java:460) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStart(RpcEndpoint.java:214) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor$StoppedState.lambda$start$0(PekkoRpcActor.java:627) ~[flink-rpc-akka96a41d36-4f80-4a91-bda1-2d6c943ad3ba.jar:1.20.0]
at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor$StoppedState.start(PekkoRpcActor.java:626) ~[flink-rpc-akka96a41d36-4f80-4a91-bda1-2d6c943ad3ba.jar:1.20.0]
at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleControlMessage(PekkoRpcActor.java:197) ~[flink-rpc-akka96a41d36-4f80-4a91-bda1-2d6c943ad3ba.jar:1.20.0]
at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33) ~[?:?]
at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29) ~[?:?]
at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) ~[flink-scala_2.12-1.20.0.jar:1.20.0]
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) ~[flink-scala_2.12-1.20.0.jar:1.20.0]
at org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29) ~[?:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) ~[flink-scala_2.12-1.20.0.jar:1.20.0]
... 14 more
Caused by: java.lang.ExceptionInInitializerError
at org.apache.hudi.io.hadoop.HoodieAvroHFileWriter.<init>(HoodieAvroHFileWriter.java:115) ~[?:?]
at org.apache.hudi.io.hadoop.HoodieAvroFileWriterFactory.newHFileFileWriter(HoodieAvroFileWriterFactory.java:108) ~[?:?]
at org.apache.hudi.io.storage.HoodieFileWriterFactory.getFileWriterByFormat(HoodieFileWriterFactory.java:70) ~[?:?]
at org.apache.hudi.io.storage.HoodieFileWriterFactory.getFileWriter(HoodieFileWriterFactory.java:53) ~[?:?]
at org.apache.hudi.io.HoodieCreateHandle.<init>(HoodieCreateHandle.java:106) ~[?:?]
at org.apache.hudi.io.HoodieCreateHandle.<init>(HoodieCreateHandle.java:82) ~[?:?]
at org.apache.hudi.io.FlinkCreateHandle.<init>(FlinkCreateHandle.java:66) ~[?:?]
at org.apache.hudi.io.FlinkCreateHandle.<init>(FlinkCreateHandle.java:59) ~[?:?]
at org.apache.hudi.io.FlinkWriteHandleFactory$BaseCommitWriteHandleFactory.create(FlinkWriteHandleFactory.java:121) ~[?:?]
at org.apache.hudi.client.HoodieFlinkWriteClient.getOrCreateWriteHandle(HoodieFlinkWriteClient.java:459) ~[?:?]
at org.apache.hudi.client.HoodieFlinkWriteClient.access$000(HoodieFlinkWriteClient.java:75) ~[?:?]
at org.apache.hudi.client.HoodieFlinkWriteClient$AutoCloseableWriteHandle.<init>(HoodieFlinkWriteClient.java:515) ~[?:?]
at org.apache.hudi.client.HoodieFlinkWriteClient.lambda$bulkInsertPreppedRecords$6(HoodieFlinkWriteClient.java:259) ~[?:?]
at java.util.stream.ReferencePipeline$3$1.accept(Unknown Source) ~[?:?]
at java.util.HashMap$ValueSpliterator.forEachRemaining(Unknown Source) ~[?:?]
at java.util.stream.AbstractPipeline.copyInto(Unknown Source) ~[?:?]
at java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown Source) ~[?:?]
at java.util.stream.ReduceOps$ReduceTask.doLeaf(Unknown Source) ~[?:?]
at java.util.stream.ReduceOps$ReduceTask.doLeaf(Unknown Source) ~[?:?]
at java.util.stream.AbstractTask.compute(Unknown Source) ~[?:?]
at java.util.concurrent.CountedCompleter.exec(Unknown Source) ~[?:?]
at java.util.concurrent.ForkJoinTask.doExec(Unknown Source) ~[?:?]
at java.util.concurrent.ForkJoinTask.doInvoke(Unknown Source) ~[?:?]
at java.util.concurrent.ForkJoinTask.invoke(Unknown Source) ~[?:?]
at java.util.stream.ReduceOps$ReduceOp.evaluateParallel(Unknown Source) ~[?:?]
at java.util.stream.AbstractPipeline.evaluate(Unknown Source) ~[?:?]
at java.util.stream.ReferencePipeline.collect(Unknown Source) ~[?:?]
at org.apache.hudi.client.HoodieFlinkWriteClient.bulkInsertPreppedRecords(HoodieFlinkWriteClient.java:263) ~[?:?]
at org.apache.hudi.client.HoodieFlinkWriteClient.bulkInsertPreppedRecords(HoodieFlinkWriteClient.java:74) ~[?:?]
at org.apache.hudi.metadata.FlinkHoodieBackedTableMetadataWriter.commitInternal(FlinkHoodieBackedTableMetadataWriter.java:166) ~[?:?]
at org.apache.hudi.metadata.FlinkHoodieBackedTableMetadataWriter.bulkCommit(FlinkHoodieBackedTableMetadataWriter.java:118) ~[?:?]
at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.initializeFromFilesystem(HoodieBackedTableMetadataWriter.java:503) ~[?:?]
at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.initializeIfNeeded(HoodieBackedTableMetadataWriter.java:283) ~[?:?]
at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.<init>(HoodieBackedTableMetadataWriter.java:188) ~[?:?]
at org.apache.hudi.metadata.FlinkHoodieBackedTableMetadataWriter.<init>(FlinkHoodieBackedTableMetadataWriter.java:87) ~[?:?]
at org.apache.hudi.metadata.FlinkHoodieBackedTableMetadataWriter.create(FlinkHoodieBackedTableMetadataWriter.java:69) ~[?:?]
at org.apache.hudi.client.HoodieFlinkTableServiceClient.initMetadataWriter(HoodieFlinkTableServiceClient.java:183) ~[?:?]
at org.apache.hudi.client.HoodieFlinkTableServiceClient.initMetadataTable(HoodieFlinkTableServiceClient.java:196) ~[?:?]
at org.apache.hudi.client.HoodieFlinkWriteClient.initMetadataTable(HoodieFlinkWriteClient.java:317) ~[?:?]
at org.apache.hudi.sink.StreamWriteOperatorCoordinator.initMetadataTable(StreamWriteOperatorCoordinator.java:352) ~[?:?]
at org.apache.hudi.sink.StreamWriteOperatorCoordinator.start(StreamWriteOperatorCoordinator.java:197) ~[?:?]
at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.start(OperatorCoordinatorHolder.java:204) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:173) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startAllOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:85) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.runtime.scheduler.SchedulerBase.startScheduling(SchedulerBase.java:635) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.runtime.jobmaster.JobMaster.startScheduling(JobMaster.java:1220) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.runtime.jobmaster.JobMaster.startJobExecution(JobMaster.java:1137) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.runtime.jobmaster.JobMaster.onStart(JobMaster.java:460) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStart(RpcEndpoint.java:214) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor$StoppedState.lambda$start$0(PekkoRpcActor.java:627) ~[flink-rpc-akka96a41d36-4f80-4a91-bda1-2d6c943ad3ba.jar:1.20.0]
at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor$StoppedState.start(PekkoRpcActor.java:626) ~[flink-rpc-akka96a41d36-4f80-4a91-bda1-2d6c943ad3ba.jar:1.20.0]
at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleControlMessage(PekkoRpcActor.java:197) ~[flink-rpc-akka96a41d36-4f80-4a91-bda1-2d6c943ad3ba.jar:1.20.0]
at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33) ~[?:?]
at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29) ~[?:?]
at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) ~[flink-scala_2.12-1.20.0.jar:1.20.0]
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) ~[flink-scala_2.12-1.20.0.jar:1.20.0]
at org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29) ~[?:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) ~[flink-scala_2.12-1.20.0.jar:1.20.0]
... 14 more
Caused by: org.apache.hadoop.metrics2.MetricsException: Metrics source RegionServer,sub=IO already exists!
at org.apache.hadoop.metrics2.lib.DefaultMetricsSystem.newSourceName(DefaultMetricsSystem.java:144) ~[hadoop-common-2.10.2-khp-p9.jar:?]
at org.apache.hadoop.metrics2.lib.DefaultMetricsSystem.sourceName(DefaultMetricsSystem.java:117) ~[hadoop-common-2.10.2-khp-p9.jar:?]
at org.apache.hadoop.metrics2.impl.MetricsSystemImpl.register(MetricsSystemImpl.java:229) ~[hadoop-common-2.10.2-khp-p9.jar:?]
at org.apache.hudi.org.apache.hadoop.hbase.metrics.BaseSourceImpl.<init>(BaseSourceImpl.java:114) ~[?:?]
at org.apache.hudi.org.apache.hadoop.hbase.io.MetricsIOSourceImpl.<init>(MetricsIOSourceImpl.java:42) ~[?:?]
at org.apache.hudi.org.apache.hadoop.hbase.io.MetricsIOSourceImpl.<init>(MetricsIOSourceImpl.java:37) ~[?:?]
at org.apache.hudi.org.apache.hadoop.hbase.regionserver.MetricsRegionServerSourceFactoryImpl.createIO(MetricsRegionServerSourceFactoryImpl.java:95) ~[?:?]
at org.apache.hudi.org.apache.hadoop.hbase.io.MetricsIO.<init>(MetricsIO.java:32) ~[?:?]
at org.apache.hudi.org.apache.hadoop.hbase.io.hfile.HFile.<clinit>(HFile.java:172) ~[?:?]
at org.apache.hudi.io.hadoop.HoodieAvroHFileWriter.<init>(HoodieAvroHFileWriter.java:115) ~[?:?]
at org.apache.hudi.io.hadoop.HoodieAvroFileWriterFactory.newHFileFileWriter(HoodieAvroFileWriterFactory.java:108) ~[?:?]
at org.apache.hudi.io.storage.HoodieFileWriterFactory.getFileWriterByFormat(HoodieFileWriterFactory.java:70) ~[?:?]
at org.apache.hudi.io.storage.HoodieFileWriterFactory.getFileWriter(HoodieFileWriterFactory.java:53) ~[?:?]
at org.apache.hudi.io.HoodieCreateHandle.<init>(HoodieCreateHandle.java:106) ~[?:?]
at org.apache.hudi.io.HoodieCreateHandle.<init>(HoodieCreateHandle.java:82) ~[?:?]
at org.apache.hudi.io.FlinkCreateHandle.<init>(FlinkCreateHandle.java:66) ~[?:?]
at org.apache.hudi.io.FlinkCreateHandle.<init>(FlinkCreateHandle.java:59) ~[?:?]
at org.apache.hudi.io.FlinkWriteHandleFactory$BaseCommitWriteHandleFactory.create(FlinkWriteHandleFactory.java:121) ~[?:?]
at org.apache.hudi.client.HoodieFlinkWriteClient.getOrCreateWriteHandle(HoodieFlinkWriteClient.java:459) ~[?:?]
at org.apache.hudi.client.HoodieFlinkWriteClient.access$000(HoodieFlinkWriteClient.java:75) ~[?:?]
at org.apache.hudi.client.HoodieFlinkWriteClient$AutoCloseableWriteHandle.<init>(HoodieFlinkWriteClient.java:515) ~[?:?]
at org.apache.hudi.client.HoodieFlinkWriteClient.lambda$bulkInsertPreppedRecords$6(HoodieFlinkWriteClient.java:259) ~[?:?]
at java.util.stream.ReferencePipeline$3$1.accept(Unknown Source) ~[?:?]
at java.util.HashMap$ValueSpliterator.forEachRemaining(Unknown Source) ~[?:?]
at java.util.stream.AbstractPipeline.copyInto(Unknown Source) ~[?:?]
at java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown Source) ~[?:?]
at java.util.stream.ReduceOps$ReduceTask.doLeaf(Unknown Source) ~[?:?]
at java.util.stream.ReduceOps$ReduceTask.doLeaf(Unknown Source) ~[?:?]
at java.util.stream.AbstractTask.compute(Unknown Source) ~[?:?]
at java.util.concurrent.CountedCompleter.exec(Unknown Source) ~[?:?]
at java.util.concurrent.ForkJoinTask.doExec(Unknown Source) ~[?:?]
at java.util.concurrent.ForkJoinTask.doInvoke(Unknown Source) ~[?:?]
at java.util.concurrent.ForkJoinTask.invoke(Unknown Source) ~[?:?]
at java.util.stream.ReduceOps$ReduceOp.evaluateParallel(Unknown Source) ~[?:?]
at java.util.stream.AbstractPipeline.evaluate(Unknown Source) ~[?:?]
at java.util.stream.ReferencePipeline.collect(Unknown Source) ~[?:?]
at org.apache.hudi.client.HoodieFlinkWriteClient.bulkInsertPreppedRecords(HoodieFlinkWriteClient.java:263) ~[?:?]
at org.apache.hudi.client.HoodieFlinkWriteClient.bulkInsertPreppedRecords(HoodieFlinkWriteClient.java:74) ~[?:?]
at org.apache.hudi.metadata.FlinkHoodieBackedTableMetadataWriter.commitInternal(FlinkHoodieBackedTableMetadataWriter.java:166) ~[?:?]
at org.apache.hudi.metadata.FlinkHoodieBackedTableMetadataWriter.bulkCommit(FlinkHoodieBackedTableMetadataWriter.java:118) ~[?:?]
at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.initializeFromFilesystem(HoodieBackedTableMetadataWriter.java:503) ~[?:?]
at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.initializeIfNeeded(HoodieBackedTableMetadataWriter.java:283) ~[?:?]
at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.<init>(HoodieBackedTableMetadataWriter.java:188) ~[?:?]
at org.apache.hudi.metadata.FlinkHoodieBackedTableMetadataWriter.<init>(FlinkHoodieBackedTableMetadataWriter.java:87) ~[?:?]
at org.apache.hudi.metadata.FlinkHoodieBackedTableMetadataWriter.create(FlinkHoodieBackedTableMetadataWriter.java:69) ~[?:?]
at org.apache.hudi.client.HoodieFlinkTableServiceClient.initMetadataWriter(HoodieFlinkTableServiceClient.java:183) ~[?:?]
at org.apache.hudi.client.HoodieFlinkTableServiceClient.initMetadataTable(HoodieFlinkTableServiceClient.java:196) ~[?:?]
at org.apache.hudi.client.HoodieFlinkWriteClient.initMetadataTable(HoodieFlinkWriteClient.java:317) ~[?:?]
at org.apache.hudi.sink.StreamWriteOperatorCoordinator.initMetadataTable(StreamWriteOperatorCoordinator.java:352) ~[?:?]
at org.apache.hudi.sink.StreamWriteOperatorCoordinator.start(StreamWriteOperatorCoordinator.java:197) ~[?:?]
at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.start(OperatorCoordinatorHolder.java:204) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:173) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startAllOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:85) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.runtime.scheduler.SchedulerBase.startScheduling(SchedulerBase.java:635) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.runtime.jobmaster.JobMaster.startScheduling(JobMaster.java:1220) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.runtime.jobmaster.JobMaster.startJobExecution(JobMaster.java:1137) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.runtime.jobmaster.JobMaster.onStart(JobMaster.java:460) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStart(RpcEndpoint.java:214) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor$StoppedState.lambda$start$0(PekkoRpcActor.java:627) ~[flink-rpc-akka96a41d36-4f80-4a91-bda1-2d6c943ad3ba.jar:1.20.0]
at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor$StoppedState.start(PekkoRpcActor.java:626) ~[flink-rpc-akka96a41d36-4f80-4a91-bda1-2d6c943ad3ba.jar:1.20.0]
at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleControlMessage(PekkoRpcActor.java:197) ~[flink-rpc-akka96a41d36-4f80-4a91-bda1-2d6c943ad3ba.jar:1.20.0]
at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33) ~[?:?]
at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29) ~[?:?]
at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) ~[flink-scala_2.12-1.20.0.jar:1.20.0]
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) ~[flink-scala_2.12-1.20.0.jar:1.20.0]
at org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29) ~[?:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) ~[flink-scala_2.12-1.20.0.jar:1.20.0]
... 14 more
I want to use flink session cluster mode rather than application mode.
You can disable the option metadata.enabled to skip the issue, but that would cause the metadata table not work and induce addidional listing cost on query side.
The issue should be addressed in 1.1 release after we migrate to Hudi native HFile writers.
@SML0127 hi, here is a pr to fix the problem https://github.com/apache/hudi/pull/13901, you can take a try.