hudi
hudi copied to clipboard
[SUPPORT]java.util.ConcurrentModificationException: Cannot resolve conflicts for overlapping writes
Tips before filing an issue
-
Have you gone through our FAQs?
-
Join the mailing list to engage in conversations and get faster support at [email protected].
-
If you have triaged this as a bug, then file an issue directly.
Describe the problem you faced when I write hudi cow table to aws s3 concurrently by spark api, the except (java.util.ConcurrentModificationException: Cannot resolve conflicts for overlapping writes) happen. I use hudi ZookeeperBasedLockProvider occ. A clear and concise description of the problem.
To Reproduce
Steps to reproduce the behavior:
1.Run 0.11.0 version Hudi write concurrently to aws s3. 2.use the below configuration. resultDF.write.format("hudi") .option(DataSourceWriteOptions.OPERATION.key(), DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) .option(DataSourceWriteOptions.TABLE_TYPE.key(), DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL) .option(DataSourceWriteOptions.RECORDKEY_FIELD.key(), "uq_id,_track_id,event,_flush_time") .option(DataSourceWriteOptions.PRECOMBINE_FIELD.key(), "process_time") .option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key(), "p_day,p_hour,p_region,p_type") .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key(), classOf[ComplexKeyGenerator].getName) .option(HoodieWriteConfig.TBL_NAME.key(), sinkHudiTable) .option(DataSourceWriteOptions.HIVE_URL.key(), "") .option(DataSourceWriteOptions.HIVE_DATABASE.key(), "default") .option(DataSourceWriteOptions.HIVE_TABLE.key(), sinkHudiTable) .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS.key(), "p_day,p_hour,p_region,p_type") .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS.key(), classOf[MultiPartKeysValueExtractor].getName) .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED.key(), "true") .option(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH_ENABLE.key(), "true") .option(HoodieIndexConfig.INDEX_TYPE.key(), HoodieIndex.IndexType.GLOBAL_BLOOM.name()) .option(HoodieCompactionConfig.CLEANER_POLICY.key(), HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name) .option(HoodieCompactionConfig.ASYNC_CLEAN.key(), "true") .option(HoodieCompactionConfig.CLEANER_COMMITS_RETAINED.key(), "240") .option(HoodieCompactionConfig.MIN_COMMITS_TO_KEEP.key(), "250") .option(HoodieCompactionConfig.MAX_COMMITS_TO_KEEP.key(), "260") .option(HoodieCompactionConfig.FAILED_WRITES_CLEANER_POLICY.key(), HoodieFailedWritesCleaningPolicy.LAZY.name()) .option(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name()) .option("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider") .option("hoodie.write.lock.zookeeper.url", "") .option("hoodie.write.lock.zookeeper.port", "2181") .option("hoodie.write.lock.zookeeper.lock_key", sinkHudiTable) .option("hoodie.write.lock.zookeeper.base_path", "/hudi_multiwriter") .option("hoodie.insert.shuffle.parallelism", "2") .option("hoodie.upsert.shuffle.parallelism", "2") .mode(SaveMode.Append) .save(s3outPath) 3. Expected behavior
Exception in thread "main" org.apache.hudi.exception.HoodieWriteConflictException: java.util.ConcurrentModificationException: Cannot resolve conflicts for overlapping writes
A clear and concise description of what you expected to happen.
Environment Description
-
Hudi version : 0.11.0
-
Spark version : 3.2.0
-
Hive version : 3.1.2
-
Hadoop version : 3.2.1
-
Storage (HDFS/S3/GCS..) : S3
-
Running on Docker? (yes/no) : no
Additional context
Add any other context about the problem here. the aws emr version is 6.7.0
Stacktrace
Add the stacktrace of the error.
Exception in thread "main" org.apache.hudi.exception.HoodieWriteConflictException: java.util.ConcurrentModificationException: Cannot resolve conflicts for overlapping writes
at org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictResolutionStrategy.resolveConflict(SimpleConcurrentFileWritesConflictResolutionStrategy.java:102)
at org.apache.hudi.client.utils.TransactionUtils.lambda$resolveWriteConflictIfAny$0(TransactionUtils.java:85)
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
at java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:742)
at java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:742)
at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:647)
at org.apache.hudi.client.utils.TransactionUtils.resolveWriteConflictIfAny(TransactionUtils.java:79)
at org.apache.hudi.client.SparkRDDWriteClient.preCommit(SparkRDDWriteClient.java:475)
at org.apache.hudi.client.BaseHoodieWriteClient.commitStats(BaseHoodieWriteClient.java:233)
at org.apache.hudi.client.SparkRDDWriteClient.commit(SparkRDDWriteClient.java:122)
at org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:678)
at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:313)
at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:165)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:115)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:112)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:108)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:519)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:83)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:519)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:495)
at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:108)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:95)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:93)
at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:136)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:848)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:382)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:303)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
at EtlHudi2hudi.writeData(EtlHudi2hudi.scala:113)
at EtlHudi2hudi.run(EtlHudi2hudi.scala:53)
at EtlHudi2hudi$.main(EtlHudi2hudi.scala:32)
at EtlHudi2hudi.main(EtlHudi2hudi.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1000)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1089)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1098)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.util.ConcurrentModificationException: Cannot resolve conflicts for overlapping writes
... 64 more
Can you job recover automatically from the failure?
Can you job recover automatically from the failure?
In my case,the exception is not root exception, so when this exception happens, my job is always running. so I can not catch the failure of the job. Do you have any other suggestions? thanks very much.
this should be expected behavior when multiple writers write records into the same File group. What behavior do you want? @maikouliujian
this should be expected behavior when multiple writers write records into the same File group. What behavior do you want? @maikouliujian
In my case,when this exception happens,my job is not failed,but always running . however ,the job can not finish . so how can i know the job runs correctly.
how are you writing to hudi. Can you give us some reproducible script. Is it spark-datasource, or spark streaming, detalstreamer, spark-sql. atleast from spark-shell, when you are using spark-ds writer, we know the command fails on conflicts.
looks like its a spark datasource write. how do you claim that the job does not fail? are you executing it from spark-shell and the command to write to hudi is just stuck?
I tried multi-writers from two diff spark-shells, and one of them fails while writing to hudi.
scala> df2.write.format("hudi").
| options(getQuickstartWriteConfigs).
| option(PRECOMBINE_FIELD_OPT_KEY, "ts").
| option(RECORDKEY_FIELD_OPT_KEY, "uuid").
| option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
| option(TABLE_NAME, tableName).
| option("hoodie.write.concurrency.mode","optimistic_concurrency_control").
| option("hoodie.cleaner.policy.failed.writes","LAZY").
| option("hoodie.write.lock.provider","org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider").
| option("hoodie.write.lock.zookeeper.url","localhost:2181").
| option("hoodie.write.lock.zookeeper.port","2181").
| option("hoodie.write.lock.zookeeper.lock_key","locks").
| option("hoodie.write.lock.zookeeper.base_path","/tmp/locks/.lock").
| mode(Append).
| save(basePath)
warning: there was one deprecation warning; re-run with -deprecation for details
[Stage 14:> (0 + 3) / 3]# WARNING: Unable to attach Serviceability Agent. Unable to attach even with module exceptions: [org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed., org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed., org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed.]
23/01/23 10:00:20 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-hbase.properties,hadoop-metrics2.properties
org.apache.hudi.exception.HoodieWriteConflictException: java.util.ConcurrentModificationException: Cannot resolve conflicts for overlapping writes
at org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictResolutionStrategy.resolveConflict(SimpleConcurrentFileWritesConflictResolutionStrategy.java:102)
at org.apache.hudi.client.utils.TransactionUtils.lambda$resolveWriteConflictIfAny$0(TransactionUtils.java:85)
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
at java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:742)
at java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:742)
at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580)
at org.apache.hudi.client.utils.TransactionUtils.resolveWriteConflictIfAny(TransactionUtils.java:79)
at org.apache.hudi.client.SparkRDDWriteClient.preCommit(SparkRDDWriteClient.java:491)
at org.apache.hudi.client.BaseHoodieWriteClient.commitStats(BaseHoodieWriteClient.java:234)
at org.apache.hudi.client.SparkRDDWriteClient.commit(SparkRDDWriteClient.java:126)
at org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:698)
at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:343)
at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:145)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:81)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:696)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:696)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:696)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:305)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:291)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:249)
... 75 elided
Caused by: java.util.ConcurrentModificationException: Cannot resolve conflicts for overlapping writes
... 109 more
scala>
Write to hudi fails and next command prompt it seen.
excerpt from my other shell which succeeded.
scala> df2.write.format("hudi").
| options(getQuickstartWriteConfigs).
| option(PRECOMBINE_FIELD_OPT_KEY, "ts").
| option(RECORDKEY_FIELD_OPT_KEY, "uuid").
| option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
| option(TABLE_NAME, tableName).
| option("hoodie.write.concurrency.mode","optimistic_concurrency_control").
| option("hoodie.cleaner.policy.failed.writes","LAZY").
| option("hoodie.write.lock.provider","org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider").
| option("hoodie.write.lock.zookeeper.url","localhost:2181").
| option("hoodie.write.lock.zookeeper.port","2181").
| option("hoodie.write.lock.zookeeper.lock_key","locks").
| option("hoodie.write.lock.zookeeper.base_path","/tmp/locks/.lock").
| mode(Append).
| save(basePath)
warning: one deprecation; for details, enable `:setting -deprecation' or `:replay -deprecation'
# WARNING: Unable to attach Serviceability Agent. Unable to attach even with module exceptions: [org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed., org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed., org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed.]
23/01/23 10:00:19 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-hbase.properties,hadoop-metrics2.properties
scala>
If you can provide us w/ reproducible script, would be nice. as of now, its not reproducible from our end
@maikouliujian : any updates please.
how are you writing to hudi. Can you give us some reproducible script. Is it spark-datasource, or spark streaming, detalstreamer, spark-sql. atleast from spark-shell, when you are using spark-ds writer, we know the command fails on conflicts.
In my case,I run my job by spark-ds writer.
looks like its a spark datasource write. how do you claim that the job does not fail? are you executing it from spark-shell and the command to write to hudi is just stuck?
In my case, I run my job by azkaban hourly scheduled. when multi jobs run, the azkaban job not fail, but always running . I see the detail log, the exception happends. Why my azkaban job can not fail?
I tried multi-writers from two diff spark-shells, and one of them fails while writing to hudi.
scala> df2.write.format("hudi"). | options(getQuickstartWriteConfigs). | option(PRECOMBINE_FIELD_OPT_KEY, "ts"). | option(RECORDKEY_FIELD_OPT_KEY, "uuid"). | option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). | option(TABLE_NAME, tableName). | option("hoodie.write.concurrency.mode","optimistic_concurrency_control"). | option("hoodie.cleaner.policy.failed.writes","LAZY"). | option("hoodie.write.lock.provider","org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider"). | option("hoodie.write.lock.zookeeper.url","localhost:2181"). | option("hoodie.write.lock.zookeeper.port","2181"). | option("hoodie.write.lock.zookeeper.lock_key","locks"). | option("hoodie.write.lock.zookeeper.base_path","/tmp/locks/.lock"). | mode(Append). | save(basePath) warning: there was one deprecation warning; re-run with -deprecation for details [Stage 14:> (0 + 3) / 3]# WARNING: Unable to attach Serviceability Agent. Unable to attach even with module exceptions: [org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed., org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed., org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed.] 23/01/23 10:00:20 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-hbase.properties,hadoop-metrics2.properties org.apache.hudi.exception.HoodieWriteConflictException: java.util.ConcurrentModificationException: Cannot resolve conflicts for overlapping writes at org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictResolutionStrategy.resolveConflict(SimpleConcurrentFileWritesConflictResolutionStrategy.java:102) at org.apache.hudi.client.utils.TransactionUtils.lambda$resolveWriteConflictIfAny$0(TransactionUtils.java:85) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382) at java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:742) at java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:742) at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580) at org.apache.hudi.client.utils.TransactionUtils.resolveWriteConflictIfAny(TransactionUtils.java:79) at org.apache.hudi.client.SparkRDDWriteClient.preCommit(SparkRDDWriteClient.java:491) at org.apache.hudi.client.BaseHoodieWriteClient.commitStats(BaseHoodieWriteClient.java:234) at org.apache.hudi.client.SparkRDDWriteClient.commit(SparkRDDWriteClient.java:126) at org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:698) at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:343) at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:145) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68) at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:81) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:696) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:696) at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:696) at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:305) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:291) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:249) ... 75 elided Caused by: java.util.ConcurrentModificationException: Cannot resolve conflicts for overlapping writes ... 109 more scala>Write to hudi fails and next command prompt it seen.
excerpt from my other shell which succeeded.
scala> df2.write.format("hudi"). | options(getQuickstartWriteConfigs). | option(PRECOMBINE_FIELD_OPT_KEY, "ts"). | option(RECORDKEY_FIELD_OPT_KEY, "uuid"). | option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). | option(TABLE_NAME, tableName). | option("hoodie.write.concurrency.mode","optimistic_concurrency_control"). | option("hoodie.cleaner.policy.failed.writes","LAZY"). | option("hoodie.write.lock.provider","org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider"). | option("hoodie.write.lock.zookeeper.url","localhost:2181"). | option("hoodie.write.lock.zookeeper.port","2181"). | option("hoodie.write.lock.zookeeper.lock_key","locks"). | option("hoodie.write.lock.zookeeper.base_path","/tmp/locks/.lock"). | mode(Append). | save(basePath) warning: one deprecation; for details, enable `:setting -deprecation' or `:replay -deprecation' # WARNING: Unable to attach Serviceability Agent. Unable to attach even with module exceptions: [org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed., org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed., org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed.] 23/01/23 10:00:19 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-hbase.properties,hadoop-metrics2.properties scala>If you can provide us w/ reproducible script, would be nice. as of now, its not reproducible from our end In hudi 0.11.0, it can not supports multiple writers on spark ds ?
I have the same issue of running multi-write in local console as well

"""
Install https://dlcdn.apache.org/spark/spark-3.3.1/spark-3.3.1-bin-hadoop2.tgz
hadoop2.7 https://github.com/soumilshah1995/winutils/blob/master/hadoop-2.7.7/bin/winutils.exe
pyspark --packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.12.1 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
VAR SPARK_HOME HADOOP_HOME
PATH
%HAPOOP_HOME%\bin
%SPARK_HOME%\bin
Complete Tutorials on HUDI https://github.com/soumilshah1995/Insert-Update-Read-Write-SnapShot-Time-Travel-incremental-Query-on-APache-Hudi-transacti/blob/main/hudi%20(1).ipynb """
import os import sys import uuid
import pyspark from pyspark.sql import SparkSession from pyspark import SparkConf, SparkContext from pyspark.sql.functions import col, asc, desc from pyspark.sql.functions import col, to_timestamp, monotonically_increasing_id, to_date, when from pyspark.sql.functions import * from pyspark.sql.types import * from datetime import datetime from functools import reduce from faker import Faker from faker import Faker
import findspark
import datetime
time = datetime.datetime.now() time = time.strftime("YMD%Y%m%dHHMMSSms%H%M%S%f")
SUBMIT_ARGS = "--packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.12.1 pyspark-shell" os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS os.environ['PYSPARK_PYTHON'] = sys.executable os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
findspark.init()
spark = SparkSession.builder
.config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer')
.config('className', 'org.apache.hudi')
.config('spark.sql.hive.convertMetastoreParquet', 'false')
.config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension')
.config('spark.sql.warehouse.dir', 'file:///C:/tmp/spark_warehouse')
.getOrCreate()
global faker
faker = Faker()
class DataGenerator(object):
@staticmethod
def get_data():
return [
(
x,
faker.name(),
faker.random_element(elements=('IT', 'HR', 'Sales', 'Marketing')),
faker.random_element(elements=('CA', 'NY', 'TX', 'FL', 'IL', 'RJ')),
faker.random_int(min=10000, max=150000),
faker.random_int(min=18, max=60),
faker.random_int(min=0, max=100000),
faker.unix_time()
) for x in range(5)
]
data = DataGenerator.get_data()
columns = ["emp_id", "employee_name", "department", "state", "salary", "age", "bonus", "ts"] spark_df = spark.createDataFrame(data=data, schema=columns) print(spark_df.show())
db_name = "hudidb" table_name = "hudi_table" recordkey = 'emp_id' precombine = 'ts'
path = "file:///C:/tmp/spark_warehouse" method = 'upsert' table_type = "COPY_ON_WRITE"
hudi_options = { 'hoodie.table.name': table_name, 'hoodie.datasource.write.recordkey.field': 'emp_id', 'hoodie.datasource.write.table.name': table_name, 'hoodie.datasource.write.operation': 'upsert', 'hoodie.datasource.write.precombine.field': 'ts', 'hoodie.upsert.shuffle.parallelism': 2, 'hoodie.insert.shuffle.parallelism': 2, 'hoodie.schema.on.read.enable' : 'true', # for changing column names
'hoodie.write.concurrency.mode':'optimistic_concurrency_control', #added for zookeeper to deal with multiple source writes
'hoodie.cleaner.policy.failed.writes':'LAZY',
# 'hoodie.write.lock.provider':'org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider',
'hoodie.write.lock.provider':'org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider',
'hoodie.write.lock.zookeeper.url':'localhost',
'hoodie.write.lock.zookeeper.port':'2181',
'hoodie.write.lock.zookeeper.lock_key':'my_lock',
'hoodie.write.lock.zookeeper.base_path':'/hudi_locks',
}
print("*"55) print("over-write") print(""*55)
spark_df.write.format("hudi").
options(**hudi_options).
mode("overwrite").
save(path)
print("*"55)
print("READ")
print(""*55)
read_df = spark.read.
format("hudi").
load(path)
print(read_df.show())
impleDataUpd = [ (6, "This is APPEND4", "Sales", "RJ", 81000, 30, 23000, 827307999), (7, "This is APPEND4", "Engineering", "RJ", 79000, 53, 15000, 1627694678), ]
columns = ["emp_id", "employee_name", "department", "state", "salary", "age", "bonus", "ts"] usr_up_df = spark.createDataFrame(data=impleDataUpd, schema=columns)
usr_up_df.write.format("hudi").options(**hudi_options).mode("append").save(path)
print("*"55)
print("READ")
print(""*55)
read_df = spark.read.
format("hudi").
load(path)
print(read_df.show())
when I run the above code in 2 seperate notebooks to simulate the multi-write process, the error occurs. Just like what @maikouliujian had faced
@tomyanth I tried to run with 0.13.0 version and I didn't had any issues like spark-shell job getting stuck. Can you try with 0.13.0 if you still face the issue.
try using DynamoDB as lock Table
DYNAMODB_LOCK_TABLE_NAME = 'hudi-lock-table'
curr_session = boto3.session.Session()
curr_region = curr_session.region_name
def upsert_hudi_table(glue_database, table_name,
record_id, precomb_key, table_type, spark_df,
enable_partition, enable_cleaner, enable_hive_sync, enable_dynamodb_lock,
use_sql_transformer, sql_transformer_query,
target_path, index_type, method='upsert'):
"""
Upserts a dataframe into a Hudi table.
Args:
glue_database (str): The name of the glue database.
table_name (str): The name of the Hudi table.
record_id (str): The name of the field in the dataframe that will be used as the record key.
precomb_key (str): The name of the field in the dataframe that will be used for pre-combine.
table_type (str): The Hudi table type (e.g., COPY_ON_WRITE, MERGE_ON_READ).
spark_df (pyspark.sql.DataFrame): The dataframe to upsert.
enable_partition (bool): Whether or not to enable partitioning.
enable_cleaner (bool): Whether or not to enable data cleaning.
enable_hive_sync (bool): Whether or not to enable syncing with Hive.
use_sql_transformer (bool): Whether or not to use SQL to transform the dataframe before upserting.
sql_transformer_query (str): The SQL query to use for data transformation.
target_path (str): The path to the target Hudi table.
method (str): The Hudi write method to use (default is 'upsert').
index_type : BLOOM or GLOBAL_BLOOM
Returns:
None
"""
# These are the basic settings for the Hoodie table
hudi_final_settings = {
"hoodie.table.name": table_name,
"hoodie.datasource.write.table.type": table_type,
"hoodie.datasource.write.operation": method,
"hoodie.datasource.write.recordkey.field": record_id,
"hoodie.datasource.write.precombine.field": precomb_key,
}
# These settings enable syncing with Hive
hudi_hive_sync_settings = {
"hoodie.parquet.compression.codec": "gzip",
"hoodie.datasource.hive_sync.enable": "true",
"hoodie.datasource.hive_sync.database": glue_database,
"hoodie.datasource.hive_sync.table": table_name,
"hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor",
"hoodie.datasource.hive_sync.use_jdbc": "false",
"hoodie.datasource.hive_sync.mode": "hms",
}
# These settings enable automatic cleaning of old data
hudi_cleaner_options = {
"hoodie.clean.automatic": "true",
"hoodie.clean.async": "true",
"hoodie.cleaner.policy": 'KEEP_LATEST_FILE_VERSIONS',
"hoodie.cleaner.fileversions.retained": "3",
"hoodie-conf hoodie.cleaner.parallelism": '200',
'hoodie.cleaner.commits.retained': 5
}
# These settings enable partitioning of the data
partition_settings = {
"hoodie.datasource.write.partitionpath.field": args['PARTITON_FIELDS'],
"hoodie.datasource.hive_sync.partition_fields": args['PARTITON_FIELDS'],
"hoodie.datasource.write.hive_style_partitioning": "true",
}
# Define a dictionary with the index settings for Hudi
hudi_index_settings = {
"hoodie.index.type": index_type, # Specify the index type for Hudi
}
hudi_dynamo_db_based_lock = {
'hoodie.write.concurrency.mode': 'optimistic_concurrency_control'
, 'hoodie.cleaner.policy.failed.writes': 'LAZY'
, 'hoodie.write.lock.provider': 'org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider'
, 'hoodie.write.lock.dynamodb.table': DYNAMODB_LOCK_TABLE_NAME
, 'hoodie.write.lock.dynamodb.partition_key': 'tablename'
, 'hoodie.write.lock.dynamodb.region': '{0}'.format(curr_region)
, 'hoodie.write.lock.dynamodb.endpoint_url': 'dynamodb.{0}.amazonaws.com'.format(curr_region)
, 'hoodie.write.lock.dynamodb.billing_mode': 'PAY_PER_REQUEST'
}
hudi_file_size = {
"hoodie.parquet.max.file.size": 512 * 1024 * 1024, # 512MB
"hoodie.parquet.small.file.limit": 104857600, # 100MB
}
# Add the Hudi index settings to the final settings dictionary
for key, value in hudi_index_settings.items():
hudi_final_settings[key] = value # Add the key-value pair to the final settings dictionary
for key, value in hudi_file_size.items():
hudi_final_settings[key] = value
# If partitioning is enabled, add the partition settings to the final settings
if enable_partition == "True" or enable_partition == "true" or enable_partition == True:
for key, value in partition_settings.items(): hudi_final_settings[key] = value
# if DynamoDB based lock enabled use dynamodb as lock table
if enable_dynamodb_lock == "True" or enable_dynamodb_lock == "true" or enable_dynamodb_lock == True:
for key, value in hudi_dynamo_db_based_lock.items(): hudi_final_settings[key] = value
# If data cleaning is enabled, add the cleaner options to the final settings
if enable_cleaner == "True" or enable_cleaner == "true" or enable_cleaner == True:
for key, value in hudi_cleaner_options.items(): hudi_final_settings[key] = value
# If Hive syncing is enabled, add the Hive sync settings to the final settings
if enable_hive_sync == "True" or enable_hive_sync == "true" or enable_hive_sync == True:
for key, value in hudi_hive_sync_settings.items(): hudi_final_settings[key] = value
# If there is data to write, apply any SQL transformations and write to the target path
if spark_df.count() > 0:
if use_sql_transformer == "True" or use_sql_transformer == "true" or use_sql_transformer == True:
spark_df.createOrReplaceTempView("temp")
spark_df = spark.sql(sql_transformer_query)
# Replace null values in all columns with default value 'unknown'
default_value = 'n/a'
for column in spark_df.columns:
spark_df = spark_df.na.fill(default_value)
print("**************************************************************")
print(spark_df.show())
print("**************************************************************")
spark_df.write.format("hudi"). \
options(**hudi_final_settings). \
mode("append"). \
save(target_path)
you can use dynamoDB lock tables
DYNAMODB_LOCK_TABLE_NAME = 'hudi-lock-table'
curr_session = boto3.session.Session()
curr_region = curr_session.region_name
hudi_dynamo_db_based_lock = {
'hoodie.write.concurrency.mode': 'optimistic_concurrency_control'
, 'hoodie.cleaner.policy.failed.writes': 'LAZY'
, 'hoodie.write.lock.provider': 'org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider'
, 'hoodie.write.lock.dynamodb.table': DYNAMODB_LOCK_TABLE_NAME
, 'hoodie.write.lock.dynamodb.partition_key': 'tablename'
, 'hoodie.write.lock.dynamodb.region': '{0}'.format(curr_region)
, 'hoodie.write.lock.dynamodb.endpoint_url': 'dynamodb.{0}.amazonaws.com'.format(curr_region)
, 'hoodie.write.lock.dynamodb.billing_mode': 'PAY_PER_REQUEST'
}
We are encountering the same issue. After using DynamoDB as the lock table, we still see this error: java.util.ConcurrentModificationException: Cannot resolve conflicts for overlapping writes
What I observed:
- I have 4 EMR Spark clusters that writes to the same table. One by one, it fails with the above error. When I look at the DynamoDB lock history, I see locks constantly getting created and released.
- The DynamoDB lock is not at file level, but on the table level. So two Hudi jobs might try to write to the same files and one of them failure. It seems if there are a couple of concurrent jobs running at the same time writing to the same files, it'll go into some sort of failure storm, which might fail everything unless you set a really really high retry threshold.
Hello @Jason-liujc @maikouliujian
I am seeing exact error, and also using dynamoDB lock same way as last comment, were you guys able to figure out the work around for it ?
Or anything to fix this issue ?, facing something very similar.
The main thing we did was to change our Hudi table structure to avoid concurrent writes to the same partition as much as possible (batch workload together, sequence the job etc)
For us, the DynamoDB lock provider wasn't able to to do any write retries, so it just fails the Spark job. We increased the yarn and spark retry to automatically retry from the cluster side.
@Jason-liujc Can we just increase yarn.resourcemanager.am.max-attempts to higher number? so we can retry to run hudi again if somehow it fails on java.util.ConcurrentModificationException: Cannot resolve conflicts for overlapping writes.
I heard that DynamoDB lock provider doesn't work with retries, but zookeeper does ? If anyone has knowledge about this, would mind sharing here ?
Talking about this: https://github.com/apache/hudi/issues/9728#issuecomment-1765275004
Can't speak to what the official guidance from Hudi is at the moment (seems like they will rollout the non-blocking concurent write feature in version 1.0+).
We had to increase yarn.resourcemanager.am.max-attempts and spark.yarn.maxAppAttempts (the spark specific config) to make it retry more and reoganize our tables to reduce concurrent writes. Any other lock provider wasn't an option for us since we are running different jobs from different clusters.
I have similar issue when tried to perform clustering (as separate process) and stream happening at same time. Even after providing lock providers (Zookeeper) running on same cluster , why this behaviour happens ?
Job1:
spark-submit
--class org.apache.hudi.utilities.HoodieClusteringJob
--conf spark.executor.cores=2
--conf spark.executor.memory=4g
--conf spark.driver.cores=3
--conf spark.driver.memory=4g
--conf spark.executor.instances=2
--deploy-mode cluster
--master yarn
--conf spark.yarn.maxAppAttempts=2
--conf spark.dynamicAllocation.enabled=true
--conf spark.dynamicAllocation.maxExecutors=3
--conf spark.network.timeout=300s
--jars /usr/lib/hudi/hudi-spark-bundle.jar /usr/lib/hudi/hudi-utilities-slim-bundle.jar
--mode scheduleAndExecute
--base-path <s3_path>
--table-name<table_name>
--spark-memory 4g
--hoodie-conf hoodie.write.concurrency.mode=optimistic_concurrency_control
--hoodie-conf hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider
--hoodie-conf hoodie.write.lock.zookeeper.url=<emr_master_dns>
--hoodie-conf hoodie.write.lock.zookeeper.port=2181
--hoodie-conf hoodie.write.lock.zookeeper.base_path=/hudi_locks
--hoodie-conf hoodie.write.lock.zookeeper.lock_key=<table_name>
--hoodie-conf hoodie.clustering.plan.strategy.target.file.max.bytes=1073741824
--hoodie-conf hoodie.clustering.plan.strategy.small.file.limit=629145600
--hoodie-conf hoodie.clustering.execution.strategy.class=org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy
--hoodie-conf hoodie.clustering.updates.strategy=org.apache.hudi.client.clustering.update.strategy.SparkAllowUpdateStrategy
Job 2: (Hudi Config)
DataSourceWriteOptions.OPERATION.key -> DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
HoodieWriteConfig.TBL_NAME.key -> <table_name>,
DataSourceWriteOptions.TABLE_TYPE.key -> "COPY_ON_WRITE",
HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key -> WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name(),
HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY.key -> HoodieFailedWritesCleaningPolicy.LAZY.name(),
HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key -> classOf[ZookeeperBasedLockProvider].getName,
HoodieLockConfig.ZK_CONNECT_URL.key -> <emr_master_dns>,
HoodieLockConfig.ZK_PORT.key -> "2181",
HoodieLockConfig.ZK_BASE_PATH.key -> "/hudi_locks",
HoodieLockConfig.ZK_LOCK_KEY.key -> <table_name>,
DataSourceWriteOptions.RECORDKEY_FIELD.key -> Seq("col3", "col4"),
DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> Seq("col1", "col2"),
DataSourceWriteOptions.PRECOMBINE_FIELD.key -> Seq("ts").mkString(","),
DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> classOf[ComplexKeyGenerator].getName,
// Metadata config
HoodieMetadataConfig.ENABLE.key -> "true",
DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key -> "true",
// Table services config
// Cleaning config
HoodieCleanConfig.ASYNC_CLEAN.key -> "false",
// Clustering config
HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE.key -> "true",
HoodieClusteringConfig.INLINE_CLUSTERING.key -> "false"
I'm seeing this with flink using the hive metastore for locking as well. The worst part is that it seems to have wiped my table in the process!
Do you have multi writers, do you config the lock providers explicitly?
Yes. This happens this seems to happens with multiple writers. I do specify the lock config:
'compaction.delta_commits'='2',
'connector' = 'hudi',
'hive_sync.db'='{hudi_db}',
'hive_sync.enable'='true',
'hive_sync.jdbcurl'='{hive_jdbc_url}',
'hive_sync.metastore.uris'='{hive_thrift_url}',
'hive_sync.mode'='hms',
'hive_sync.password'='',
'hive_sync.table'='{hudi_table}',
'hive_sync.username'='hive',
'hoodie.cleaner.policy.failed.writes' = 'LAZY',
'hoodie.database.name'='{hudi_db}',
'hoodie.datasource.write.recordkey.field'='name',
'hoodie.embed.timeline.server'='false',
'hoodie.parquet.compression.codec' = 'snappy',
'hoodie.payload.event.time.field' = 'event_time',
'hoodie.payload.ordering.field' = 'event_time',
'hoodie.table.name'='{hudi_table}',
'hoodie.table.precombine.field' = 'event_time',
'hoodie.table.recordkey.fields' = 'name',
'hoodie.write.concurrency.mode' = 'optimistic_concurrency_control',
'hoodie.write.lock.hivemetastore.database'='{hudi_db}',
'hoodie.write.lock.hivemetastore.table'='{hudi_table}',
'hoodie.write.lock.provider' = 'org.apache.hudi.hive.transaction.lock.HiveMetastoreBasedLockProvider',
'path' = '{hudi_table_path}',
'precombine.field'='event_time',
'read.streaming.enabled'='true',
'read.streaming.skip_compaction'='true',
'read.tasks'='4',
'recordkey.field'='name',
'table.type' = 'MERGE_ON_READ',
'write.operation' = 'upsert',
'write.precombine.field' = 'event_time',
'write.recordkey.field'='name',
'write.tasks'='4'
You are using OCC, does the conflict really happens or not? If it happens, the job needs a retry to make the comit succeed.
We are facing the same issues using OCC Mode, on DynamoDB based locking although this mechanism work fine with zookeeper but not with DynamoDB. any heads up over this issues.