[SUPPORT] Hudi fails ACID verification test
Describe the problem you faced
For work we had needed to have a concurrent read/write support for our data lake, which uses Spark. We where noticing some inconsistencies, so we wrote a test that can verify whether something like Hudi adheres to ACID. We did however find that Hudi fails this test.
Now, it can be that we've wrongly configured Hudi or that there is some mistake in the test code.
My question is if someone of you can take a look at it, and perhaps can explain what is going wrong here.
To Reproduce
How to run the test and it's findings are described in the README of the repository, but here is a short run down
Steps to reproduce the behavior:
- Check out repo: hudi-acid-verification
- Start Docker if not already running
- Run the test TransactionManagerTest.java
- Observe that writers breakdown and that very few transactions have been processed.
Expected behavior
- I expect the writers not to break down
- I expect that the full amount of transactions are executed
Environment Description
-
Hudi version : 0.14.1
-
Spark version : 3.4.2
-
Hive version : 4.0.0-beta-1
-
Hadoop version : 3.2.2
-
Storage (HDFS/S3/GCS..) : NTFS(Windows), APFS(macOS) & HDFS
-
Running on Docker? (yes/no) : No
Additional context It's worth noting that other solutions, Iceberg and Delta Lake, have also been tested this way. Iceberg also didn't pass this test. Delta Lake did pass the test.
Stacktrace
24/05/07 21:49:38 ERROR TransactionWriter: Exception in writer.
org.example.writer.TransactionFailedException: org.apache.hudi.exception.HoodieRollbackException: Failed to rollback file:/tmp/lakehouse/concurrencytestdb.db/acid_verification commits 20240507214932607
at org.example.writer.TransactionWriter.wrapOrRethrowException(TransactionWriter.java:190)
at org.example.writer.TransactionWriter.tryTransaction(TransactionWriter.java:184)
at org.example.writer.TransactionWriter.updateTransaction(TransactionWriter.java:143)
at org.example.writer.TransactionWriter.lambda$handleTransaction$0(TransactionWriter.java:89)
at org.example.writer.TransactionWriter.withRetryOnException(TransactionWriter.java:109)
at org.example.writer.TransactionWriter.handleTransaction(TransactionWriter.java:83)
at org.example.writer.TransactionWriter.run(TransactionWriter.java:70)
Caused by: org.apache.hudi.exception.HoodieRollbackException: Failed to rollback file:/tmp/lakehouse/concurrencytestdb.db/acid_verification commits 20240507214932607
at org.apache.hudi.client.BaseHoodieTableServiceClient.rollback(BaseHoodieTableServiceClient.java:1065)
at org.apache.hudi.client.BaseHoodieTableServiceClient.rollback(BaseHoodieTableServiceClient.java:1012)
at org.apache.hudi.client.BaseHoodieTableServiceClient.rollbackFailedWrites(BaseHoodieTableServiceClient.java:940)
at org.apache.hudi.client.BaseHoodieTableServiceClient.rollbackFailedWrites(BaseHoodieTableServiceClient.java:922)
at org.apache.hudi.client.BaseHoodieTableServiceClient.rollbackFailedWrites(BaseHoodieTableServiceClient.java:917)
at org.apache.hudi.client.BaseHoodieWriteClient.lambda$startCommitWithTime$97cdbdca$1(BaseHoodieWriteClient.java:941)
at org.apache.hudi.common.util.CleanerUtils.rollbackFailedWrites(CleanerUtils.java:222)
at org.apache.hudi.client.BaseHoodieWriteClient.startCommitWithTime(BaseHoodieWriteClient.java:940)
at org.apache.hudi.client.BaseHoodieWriteClient.startCommitWithTime(BaseHoodieWriteClient.java:933)
at org.apache.hudi.HoodieSparkSqlWriterInternal.writeInternal(HoodieSparkSqlWriter.scala:501)
at org.apache.hudi.HoodieSparkSqlWriterInternal.write(HoodieSparkSqlWriter.scala:204)
at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:121)
at org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.executeUpsert(MergeIntoHoodieTableCommand.scala:439)
at org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.run(MergeIntoHoodieTableCommand.scala:282)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:218)
at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:98)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:95)
at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:640)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:630)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:671)
at org.example.writer.TransactionWriter.lambda$updateTransaction$2(TransactionWriter.java:160)
at org.example.writer.TransactionWriter.tryTransaction(TransactionWriter.java:181)
... 5 more
Caused by: org.apache.hudi.exception.HoodieRollbackException: Found commits after time :20240507214932607, please rollback greater commits first
at org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor.validateRollbackCommitSequence(BaseRollbackActionExecutor.java:179)
at org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor.doRollbackAndGetStats(BaseRollbackActionExecutor.java:218)
at org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor.runRollback(BaseRollbackActionExecutor.java:111)
at org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor.execute(BaseRollbackActionExecutor.java:138)
at org.apache.hudi.table.HoodieSparkCopyOnWriteTable.rollback(HoodieSparkCopyOnWriteTable.java:298)
at org.apache.hudi.client.BaseHoodieTableServiceClient.rollback(BaseHoodieTableServiceClient.java:1048)
... 51 more
24/05/07 21:49:38 INFO TransactionWriter: acid-writer-1 finished.
@matthijseikelenboom I don't see any lock related configurations in your setup. I checked that you are using 2 parallel writers. So you may need to configure lock during write. Hudi follows OCC principal. Check multi writer setup here - https://hudi.apache.org/docs/concurrency_control/#model-c-multi-writer
Let me know in case I am missing anything on the same. Thanks a lot.
@ad1happy2go Ah yes, you're right. I seem to have forgot to add the hudi-defaults.conf file to this project. I've added it to my repository and ran the test again. It comes further along, but still breaks down.
Stacktrace (Be warned, it's a big one):
ERROR! : Failed to upsert for commit time 20240508114518478
24/05/08 11:45:18 ERROR TransactionWriter: Exception in writer.
java.lang.RuntimeException: org.apache.hudi.exception.HoodieUpsertException: Failed to upsert for commit time 20240508114518478
at org.example.writer.TransactionWriter.wrapOrRethrowException(TransactionWriter.java:192)
at org.example.writer.TransactionWriter.tryTransaction(TransactionWriter.java:184)
at org.example.writer.TransactionWriter.updateTransaction(TransactionWriter.java:143)
at org.example.writer.TransactionWriter.lambda$handleTransaction$0(TransactionWriter.java:89)
at org.example.writer.TransactionWriter.withRetryOnException(TransactionWriter.java:109)
at org.example.writer.TransactionWriter.handleTransaction(TransactionWriter.java:83)
at org.example.writer.TransactionWriter.run(TransactionWriter.java:70)
Caused by: org.apache.hudi.exception.HoodieUpsertException: Failed to upsert for commit time 20240508114518478
at org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:70)
at org.apache.hudi.table.action.commit.SparkUpsertCommitActionExecutor.execute(SparkUpsertCommitActionExecutor.java:44)
at org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:114)
at org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:103)
at org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:142)
at org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:224)
at org.apache.hudi.HoodieSparkSqlWriterInternal.liftedTree1$1(HoodieSparkSqlWriter.scala:504)
at org.apache.hudi.HoodieSparkSqlWriterInternal.writeInternal(HoodieSparkSqlWriter.scala:502)
at org.apache.hudi.HoodieSparkSqlWriterInternal.write(HoodieSparkSqlWriter.scala:204)
at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:121)
at org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.executeUpsert(MergeIntoHoodieTableCommand.scala:439)
at org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.run(MergeIntoHoodieTableCommand.scala:282)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:218)
at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:98)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:95)
at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:640)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:630)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:671)
at org.example.writer.TransactionWriter.lambda$updateTransaction$2(TransactionWriter.java:160)
at org.example.writer.TransactionWriter.tryTransaction(TransactionWriter.java:181)
... 5 more
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 490.0 failed 1 times, most recent failure: Lost task 1.0 in stage 490.0 (TID 1367) (192.168.1.21 executor driver): com.esotericsoftware.kryo.KryoException: java.lang.IllegalArgumentException: Unable to create serializer "com.esotericsoftware.kryo.serializers.FieldSerializer" for class: java.util.concurrent.atomic.AtomicBoolean
Serialization trace:
isDeleteComputed (org.apache.spark.sql.hudi.command.payload.ExpressionPayload)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:101)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651)
at org.apache.hudi.common.model.HoodieAvroRecord.writeRecordPayload(HoodieAvroRecord.java:225)
at org.apache.hudi.common.model.HoodieAvroRecord.writeRecordPayload(HoodieAvroRecord.java:48)
at org.apache.hudi.common.model.HoodieRecord.write(HoodieRecord.java:356)
at com.esotericsoftware.kryo.serializers.DefaultSerializers$KryoSerializableSerializer.write(DefaultSerializers.java:514)
at com.esotericsoftware.kryo.serializers.DefaultSerializers$KryoSerializableSerializer.write(DefaultSerializers.java:512)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651)
at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:274)
at org.apache.spark.storage.memory.SerializedValuesHolder.storeValue(MemoryStore.scala:727)
at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:224)
at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:352)
at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1548)
at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1458)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1522)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1349)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:375)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:326)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
at org.apache.spark.scheduler.Task.run(Task.scala:139)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.lang.IllegalArgumentException: Unable to create serializer "com.esotericsoftware.kryo.serializers.FieldSerializer" for class: java.util.concurrent.atomic.AtomicBoolean
at com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:65)
at com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:43)
at com.esotericsoftware.kryo.Kryo.newDefaultSerializer(Kryo.java:396)
at com.twitter.chill.KryoBase.newDefaultSerializer(KryoBase.scala:62)
at com.esotericsoftware.kryo.Kryo.getDefaultSerializer(Kryo.java:380)
at com.esotericsoftware.kryo.util.DefaultClassResolver.registerImplicit(DefaultClassResolver.java:74)
at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:508)
at com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:97)
at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:540)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:75)
... 41 more
Caused by: java.lang.reflect.InvocationTargetException
at jdk.internal.reflect.GeneratedConstructorAccessor37.newInstance(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:499)
at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:480)
at com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:51)
... 50 more
Caused by: java.lang.reflect.InaccessibleObjectException: Unable to make field private volatile int java.util.concurrent.atomic.AtomicBoolean.value accessible: module java.base does not "opens java.util.concurrent.atomic" to unnamed module @7c29daf3
at java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:354)
at java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:297)
at java.base/java.lang.reflect.Field.checkCanSetAccessible(Field.java:178)
at java.base/java.lang.reflect.Field.setAccessible(Field.java:172)
at com.esotericsoftware.kryo.serializers.FieldSerializer.buildValidFields(FieldSerializer.java:283)
at com.esotericsoftware.kryo.serializers.FieldSerializer.rebuildCachedFields(FieldSerializer.java:216)
at com.esotericsoftware.kryo.serializers.FieldSerializer.rebuildCachedFields(FieldSerializer.java:157)
at com.esotericsoftware.kryo.serializers.FieldSerializer.<init>(FieldSerializer.java:150)
at com.esotericsoftware.kryo.serializers.FieldSerializer.<init>(FieldSerializer.java:134)
at com.esotericsoftware.kryo.serializers.FieldSerializer.<init>(FieldSerializer.java:130)
... 55 more
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2785)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2721)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2720)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2720)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1206)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1206)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1206)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2984)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2923)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2912)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:971)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2263)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2284)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2303)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2328)
at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1019)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:405)
at org.apache.spark.rdd.RDD.collect(RDD.scala:1018)
at org.apache.spark.api.java.JavaRDDLike.collect(JavaRDDLike.scala:362)
at org.apache.spark.api.java.JavaRDDLike.collect$(JavaRDDLike.scala:361)
at org.apache.spark.api.java.AbstractJavaRDDLike.collect(JavaRDDLike.scala:45)
at org.apache.hudi.data.HoodieJavaRDD.collectAsList(HoodieJavaRDD.java:177)
at org.apache.hudi.index.simple.HoodieSimpleIndex.fetchRecordLocationsForAffectedPartitions(HoodieSimpleIndex.java:147)
at org.apache.hudi.index.simple.HoodieSimpleIndex.tagLocationInternal(HoodieSimpleIndex.java:118)
at org.apache.hudi.index.simple.HoodieSimpleIndex.tagLocation(HoodieSimpleIndex.java:91)
at org.apache.hudi.table.action.commit.HoodieWriteHelper.tag(HoodieWriteHelper.java:55)
at org.apache.hudi.table.action.commit.HoodieWriteHelper.tag(HoodieWriteHelper.java:37)
at org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:59)
... 49 more
Caused by: com.esotericsoftware.kryo.KryoException: java.lang.IllegalArgumentException: Unable to create serializer "com.esotericsoftware.kryo.serializers.FieldSerializer" for class: java.util.concurrent.atomic.AtomicBoolean
Serialization trace:
isDeleteComputed (org.apache.spark.sql.hudi.command.payload.ExpressionPayload)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:101)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651)
at org.apache.hudi.common.model.HoodieAvroRecord.writeRecordPayload(HoodieAvroRecord.java:225)
at org.apache.hudi.common.model.HoodieAvroRecord.writeRecordPayload(HoodieAvroRecord.java:48)
at org.apache.hudi.common.model.HoodieRecord.write(HoodieRecord.java:356)
at com.esotericsoftware.kryo.serializers.DefaultSerializers$KryoSerializableSerializer.write(DefaultSerializers.java:514)
at com.esotericsoftware.kryo.serializers.DefaultSerializers$KryoSerializableSerializer.write(DefaultSerializers.java:512)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651)
at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:274)
at org.apache.spark.storage.memory.SerializedValuesHolder.storeValue(MemoryStore.scala:727)
at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:224)
at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:352)
at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1548)
at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1458)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1522)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1349)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:375)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:326)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
at org.apache.spark.scheduler.Task.run(Task.scala:139)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.lang.IllegalArgumentException: Unable to create serializer "com.esotericsoftware.kryo.serializers.FieldSerializer" for class: java.util.concurrent.atomic.AtomicBoolean
at com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:65)
at com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:43)
at com.esotericsoftware.kryo.Kryo.newDefaultSerializer(Kryo.java:396)
at com.twitter.chill.KryoBase.newDefaultSerializer(KryoBase.scala:62)
at com.esotericsoftware.kryo.Kryo.getDefaultSerializer(Kryo.java:380)
at com.esotericsoftware.kryo.util.DefaultClassResolver.registerImplicit(DefaultClassResolver.java:74)
at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:508)
at com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:97)
at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:540)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:75)
... 41 more
Caused by: java.lang.reflect.InvocationTargetException
at jdk.internal.reflect.GeneratedConstructorAccessor37.newInstance(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:499)
at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:480)
at com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:51)
... 50 more
Caused by: java.lang.reflect.InaccessibleObjectException: Unable to make field private volatile int java.util.concurrent.atomic.AtomicBoolean.value accessible: module java.base does not "opens java.util.concurrent.atomic" to unnamed module @7c29daf3
at java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:354)
at java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:297)
at java.base/java.lang.reflect.Field.checkCanSetAccessible(Field.java:178)
at java.base/java.lang.reflect.Field.setAccessible(Field.java:172)
at com.esotericsoftware.kryo.serializers.FieldSerializer.buildValidFields(FieldSerializer.java:283)
at com.esotericsoftware.kryo.serializers.FieldSerializer.rebuildCachedFields(FieldSerializer.java:216)
at com.esotericsoftware.kryo.serializers.FieldSerializer.rebuildCachedFields(FieldSerializer.java:157)
at com.esotericsoftware.kryo.serializers.FieldSerializer.<init>(FieldSerializer.java:150)
at com.esotericsoftware.kryo.serializers.FieldSerializer.<init>(FieldSerializer.java:134)
at com.esotericsoftware.kryo.serializers.FieldSerializer.<init>(FieldSerializer.java:130)
... 55 more
24/05/08 11:45:18 INFO TransactionWriter: acid-writer-0 finished.
@matthijseikelenboom Looks like some library conflicts are there in the project. Need to reproduce it.
@matthijseikelenboom I noticed you are using JAVA 17 for the same. Hudi 0.14.1 doesn't support JAVA 17 yet. The newer Hudi version will be able to support the same.
Some reference to similar issue related to java 17 here - https://github.com/EsotericSoftware/kryo/issues/885
Can you try with JAVA 8 once. Thanks.
Okay, yeah sure. The original test was written with Java 11, but I updated to 17 because I thought why not and Spark 3.4.2 supports it.
Is it known that Hudi (Or Kryo) also doesn't work with Java 11 and is that why you suggest Java 8?
@ad1happy2go I've pushed a new branch on the repo where the project is downgraded to Java 8. When running the test then, the writers don't seem to fail anymore, but it still fails the verification test.
@matthijseikelenboom I tried to run in my local but again seeing issues. We can connect once. If you are on Apache Hudi slack can you ping me "Aditya Goenka"
@matthijseikelenboom I was able to successfully test. There were two issues -
- InprocessLockProvider doesn't work for multiple writes. So use FileSystemBasedLockProvider in transactionWriter.java
dataSet.write().format("hudi")
.option("hoodie.table.name", tableName)
.option("hoodie.datasource.write.recordkey.field", "primaryKeyValue")
.option("hoodie.datasource.write.partitionpath.field", "partitionKeyValue")
.option("hoodie.datasource.write.precombine.field", "dataValue")
.option("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider")
.mode(SaveMode.Append)
.save(tablePath);
- Along with refresh, to add partitions to mock repair also in ReaderThread.
session.sql("REFRESH TABLE " + fullyQualifiedTableName);
session.sql("MSCK REPAIR TABLE" + fullyQualifiedTableName);
@matthijseikelenboom Please let us know if it works for you also. Thanks.
Tested and verified. Closing issues.
More info
Solution has been tested on:
- Java 8 ✅
- Java 11 ✅
- Java 17 ❌ (As of this moment, Hudi doesn't support this version)
Thanks @matthijseikelenboom for the update