hudi
hudi copied to clipboard
[SUPPORT] NotSerializableException using SparkRDDWriteClient with OCC and DynamoDBBasedLockProvider
Describe the problem you faced
We are trying to delete records from a MOR table with the following write configs for locking and concurrency and are running into java.io.NotSerializableException: org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider
write concurrency/locking configs:
(hoodie.write.concurrency.mode,OPTIMISTIC_CONCURRENCY_CONTROL)
(hoodie.write.lock.conflict.resolution.strategy,org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictResolutionStrategy)
(hoodie.write.lock.dynamodb.endpoint_url,http://localhost:8000)
(hoodie.write.lock.dynamodb.partition_key,users_changes-us-east-1-local)
(hoodie.write.lock.dynamodb.region,us-east-1)
(hoodie.write.lock.dynamodb.table,datalake-locks)
(hoodie.write.lock.provider,org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider)
(Note the config above is taken from an integration test where we specify a local dynamoDB but the same error happens in production dynamoDB).
delete code snippet:
val writeClient: SparkRDDWriteClient[_] = buildWriteClient(config, spark.sparkContext)
try {
val deleteInstant = writeClient.startCommit()
writeClient.delete(recordKeys, deleteInstant)
...
To Reproduce
- Build an RDD of recordKeys to delete
- Build a SparkRDDWriteClient with concurrency and locking configs as specified above
- execute
writeClient.delete(recordKeys, deleteInstant)
- observe
java.io.NotSerializableException: org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider
Expected behavior delete is successful
Environment Description
Hudi version : 0.13.0 Spark version : 3.3.0 Hive version : n/a Hadoop version : 3.2.1 Storage (HDFS/S3/GCS..) : s3 Running on Docker? (yes/no) : no
Additional context
We are using the writeclient to delete an RDD of recordKeys because the records keys were generated using UUID keys, which cannot be deleted using the dataframe API. This was surfaced in this issue. It seems we did not test with full concurrency/locking configured at that time.
Stacktrace
org.apache.hudi.exception.HoodieUpsertException: Failed to delete for commit time 20230927230655498
org.apache.hudi.exception.HoodieUpsertException: Failed to delete for commit time 20230927230655498
at org.apache.hudi.table.action.commit.HoodieDeleteHelper.execute(HoodieDeleteHelper.java:117)
at org.apache.hudi.table.action.deltacommit.SparkDeleteDeltaCommitActionExecutor.execute(SparkDeleteDeltaCommitActionExecutor.java:45)
at org.apache.hudi.table.HoodieSparkMergeOnReadTable.delete(HoodieSparkMergeOnReadTable.java:105)
at org.apache.hudi.table.HoodieSparkMergeOnReadTable.delete(HoodieSparkMergeOnReadTable.java:80)
at org.apache.hudi.client.SparkRDDWriteClient.delete(SparkRDDWriteClient.java:243)
at com.heap.datalake.delete.UUIDRecordKeyDeleter$.deleteRecords(UUIDRecordKeyDeleter.scala:82)
at com.heap.datalake.delete.UUIDRecordKeyDeleter$.deleteRecords(UUIDRecordKeyDeleter.scala:210)
at com.heap.datalake.delete.UserPropertiesDeleter$.deleteUsers(UserPropertiesDeleter.scala:48)
at com.heap.datalake.delete.UserPropertiesDeleter$.run(UserPropertiesDeleter.scala:23)
at com.heap.datalake.delete.DeleteUsersApp$.$anonfun$run$2(DeleteUsersApp.scala:56)
at scala.util.Try$.apply(Try.scala:213)
at com.heap.datalake.delete.DeleteUsersApp$.run(DeleteUsersApp.scala:56)
at com.heap.datalake.delete.DeleteUsersApp$.$anonfun$main$2(DeleteUsersApp.scala:38)
at com.heap.datalake.delete.DeleteUsersApp$.$anonfun$main$2$adapted(DeleteUsersApp.scala:23)
at scala.util.Success.foreach(Try.scala:253)
at com.heap.datalake.delete.DeleteUsersApp$.main(DeleteUsersApp.scala:23)
at com.heap.datalake.delete.DeleteUsersApp.main(DeleteUsersApp.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:742)
Caused by: org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:444)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:416)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:163)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2493)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2268)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2294)
at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1021)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
at org.apache.spark.rdd.RDD.collect(RDD.scala:1020)
at org.apache.spark.api.java.JavaRDDLike.collect(JavaRDDLike.scala:362)
at org.apache.spark.api.java.JavaRDDLike.collect$(JavaRDDLike.scala:361)
at org.apache.spark.api.java.AbstractJavaRDDLike.collect(JavaRDDLike.scala:45)
at org.apache.hudi.data.HoodieJavaRDD.collectAsList(HoodieJavaRDD.java:163)
at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.setCommitMetadata(BaseSparkCommitActionExecutor.java:283)
at org.apache.hudi.table.action.commit.BaseCommitActionExecutor.autoCommit(BaseCommitActionExecutor.java:186)
at org.apache.hudi.table.action.commit.BaseCommitActionExecutor.commitOnAutoCommit(BaseCommitActionExecutor.java:174)
at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.updateIndexAndCommitIfNeeded(BaseSparkCommitActionExecutor.java:273)
at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:178)
at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:83)
at org.apache.hudi.table.action.commit.HoodieDeleteHelper.execute(HoodieDeleteHelper.java:103)
... 21 more
Caused by: java.io.NotSerializableException: org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider
Serialization stack:
- object not serializable (class: org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider, value: org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider@697af25)
- field (class: org.apache.hudi.client.transaction.lock.LockManager, name: lockProvider, type: interface org.apache.hudi.common.lock.LockProvider)
- object (class org.apache.hudi.client.transaction.lock.LockManager, org.apache.hudi.client.transaction.lock.LockManager@a109d65)
- field (class: org.apache.hudi.client.transaction.TransactionManager, name: lockManager, type: class org.apache.hudi.client.transaction.lock.LockManager)
- object (class org.apache.hudi.client.transaction.TransactionManager, org.apache.hudi.client.transaction.TransactionManager@157ce6e9)
- field (class: org.apache.hudi.table.action.commit.BaseCommitActionExecutor, name: txnManager, type: class org.apache.hudi.client.transaction.TransactionManager)
- object (class org.apache.hudi.table.action.deltacommit.SparkDeleteDeltaCommitActionExecutor, org.apache.hudi.table.action.deltacommit.SparkDeleteDeltaCommitActionExecutor@d63f085)
- element of array (index: 0)
- array (class [Ljava.lang.Object;, size 2)
- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor, functionalInterfaceMethod=org/apache/spark/api/java/functi
on/Function2.call:(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeSpecial org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1:(Lorg/apache/sp
ark/Partitioner;Ljava/lang/Integer;Ljava/util/Iterator;)Ljava/util/Iterator;, instantiatedMethodType=(Ljava/lang/Integer;Ljava/util/Iterator;)Ljava/util/Iterator;, numCaptured=2])
- writeReplace data (class: java.lang.invoke.SerializedLambda)
- object (class org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor$$Lambda$5283/1488307125, org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor$$Lambda$5283/1488307125@467af2d8)
- element of array (index: 0)
- array (class [Ljava.lang.Object;, size 1)
- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=interface org.apache.spark.api.java.JavaRDDLike, functionalInterfaceMethod=scala/Function2.apply:(Ljava/lang/Object;Ljava/lang/Obje
ct;)Ljava/lang/Object;, implementation=invokeStatic org/apache/spark/api/java/JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted:(Lorg/apache/spark/api/java/function/Function2;Ljava/lang/Object;Lscala/collection/Iterato
r;)Lscala/collection/Iterator;, instantiatedMethodType=(Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;, numCaptured=1])
- writeReplace data (class: java.lang.invoke.SerializedLambda)
- object (class org.apache.spark.api.java.JavaRDDLike$$Lambda$5284/280417748, org.apache.spark.api.java.JavaRDDLike$$Lambda$5284/280417748@1c7adacb)
- element of array (index: 0)
- array (class [Ljava.lang.Object;, size 1)
- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class org.apache.spark.rdd.RDD, functionalInterfaceMethod=scala/Function3.apply:(Ljava/lang/Object;Ljava/lang/Object;Ljava/lang/Obj
ect;)Ljava/lang/Object;, implementation=invokeStatic org/apache/spark/rdd/RDD.$anonfun$mapPartitionsWithIndex$2$adapted:(Lscala/Function2;Lorg/apache/spark/TaskContext;Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/c
ollection/Iterator;, instantiatedMethodType=(Lorg/apache/spark/TaskContext;Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;, numCaptured=1])
- writeReplace data (class: java.lang.invoke.SerializedLambda)
- object (class org.apache.spark.rdd.RDD$$Lambda$4810/1052938329, org.apache.spark.rdd.RDD$$Lambda$4810/1052938329@3c31aa92)
- field (class: org.apache.spark.rdd.MapPartitionsRDD, name: f, type: interface scala.Function3)
- object (class org.apache.spark.rdd.MapPartitionsRDD, MapPartitionsRDD[146] at mapPartitionsWithIndex at BaseSparkCommitActionExecutor.java:249)
- field (class: org.apache.spark.NarrowDependency, name: _rdd, type: class org.apache.spark.rdd.RDD)
- object (class org.apache.spark.OneToOneDependency, org.apache.spark.OneToOneDependency@373e82e8)
- - writeObject data (class: scala.collection.immutable.List$SerializationProxy)
- object (class scala.collection.immutable.List$SerializationProxy, scala.collection.immutable.List$SerializationProxy@44652f9f)
- writeReplace data (class: scala.collection.immutable.List$SerializationProxy)
- object (class scala.collection.immutable.$colon$colon, List(org.apache.spark.OneToOneDependency@373e82e8))
- field (class: org.apache.spark.rdd.RDD, name: dependencies_, type: interface scala.collection.Seq)
- object (class org.apache.spark.rdd.MapPartitionsRDD, MapPartitionsRDD[147] at flatMap at BaseSparkCommitActionExecutor.java:255)
- field (class: org.apache.spark.NarrowDependency, name: _rdd, type: class org.apache.spark.rdd.RDD)
- object (class org.apache.spark.OneToOneDependency, org.apache.spark.OneToOneDependency@66f562d4)
- writeObject data (class: scala.collection.immutable.List$SerializationProxy)
- object (class scala.collection.immutable.List$SerializationProxy, scala.collection.immutable.List$SerializationProxy@d993b51)
- writeReplace data (class: scala.collection.immutable.List$SerializationProxy)
- object (class scala.collection.immutable.$colon$colon, List(org.apache.spark.OneToOneDependency@66f562d4))
- field (class: org.apache.spark.rdd.RDD, name: dependencies_, type: interface scala.collection.Seq)
- object (class org.apache.spark.rdd.MapPartitionsRDD, MapPartitionsRDD[148] at map at HoodieJavaRDD.java:111)
- element of array (index: 0)
- array (class [Ljava.lang.Object;, size 1)
- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class org.apache.spark.rdd.RDD, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementat
ion=invokeStatic org/apache/spark/rdd/RDD.$anonfun$collect$2:(Lorg/apache/spark/rdd/RDD;Lscala/collection/Iterator;)Ljava/lang/Object;, instantiatedMethodType=(Lscala/collection/Iterator;)Ljava/lang/Object;, numCaptured=1]
)
- writeReplace data (class: java.lang.invoke.SerializedLambda)
- object (class org.apache.spark.rdd.RDD$$Lambda$3667/1073189054, org.apache.spark.rdd.RDD$$Lambda$3667/1073189054@770c17a4)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:49)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:115)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:441)
... 42 more
23/09/27 23:07:54 INFO SparkContext: Invoking stop() from shutdown hook
@ehurheap Did you tried the same lock configuration with a normal insert on test table to ensure configurations are good?
Yes. I can write a dataframe to the same table, for example:
data.write
.format("org.apache.hudi.Spark32PlusDefaultSource")
.options(writeWithLocking)
.mode("append")
.save(tablePath)
where writeWithLocking options are:
(hoodie.bulkinsert.shuffle.parallelism,2)
(hoodie.bulkinsert.sort.mode,NONE)
(hoodie.clean.async,false)
(hoodie.clean.automatic,false)
(hoodie.cleaner.policy.failed.writes,LAZY)
(hoodie.combine.before.insert,false)
(hoodie.compact.inline,false)
(hoodie.compact.schedule.inline,false)
(hoodie.datasource.compaction.async.enable,false)
(hoodie.datasource.write.hive_style_partitioning,true)
(hoodie.datasource.write.keygenerator.class,org.apache.spark.sql.hudi.command.UuidKeyGenerator)
(hoodie.datasource.write.operation,bulk_insert)
(hoodie.datasource.write.partitionpath.field,env_id,week)
(hoodie.datasource.write.precombine.field,schematized_at)
(hoodie.datasource.write.recordkey.field,env_id,user_id)
(hoodie.datasource.write.row.writer.enable,false)
(hoodie.datasource.write.table.type,MERGE_ON_READ)
(hoodie.metadata.enable,false)
(hoodie.table.name,users_changes)
(hoodie.write.concurrency.mode,OPTIMISTIC_CONCURRENCY_CONTROL)
(hoodie.write.lock.dynamodb.endpoint_url,http://localhost:8000)
(hoodie.write.lock.dynamodb.partition_key,users_changes-us-east-1-local)
(hoodie.write.lock.dynamodb.region,us-east-1)
(hoodie.write.lock.dynamodb.table,datalake-locks)
(hoodie.write.lock.provider,org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider)
These locking configs are also in our production ingestion which writes to hudi using spark structured streaming without error.
@ehurheap Sorry for the delay here. Is it possible to provide full implementation of UUIDRecordKeyDeleter.scala
object UUIDRecordKeyDeleter {
private val log = loggerForClass(UUIDRecordKeyDeleter.getClass)
def query(tablePath: String, predicate: Column, queryType: String)(implicit
spark: SparkSession
): DataFrame = {
spark.read
.format("hudi")
.option(QUERY_TYPE.key(), queryType)
.option(HoodieMetadataConfig.ENABLE.key(), "false")
.load(tablePath)
.where(predicate)
}
// Computes record keys of the records matching the predicate
def computeRecordKeys(
tablePath: String,
predicate: Column
)(implicit
spark: SparkSession
): JavaRDD[HoodieKey] = {
implicit val hoodieKeyEncoder: Encoder[HoodieKey] =
Encoders.bean(classOf[org.apache.hudi.common.model.HoodieKey])
query(tablePath, predicate, QUERY_TYPE_SNAPSHOT_OPT_VAL)
.select(HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieRecord.PARTITION_PATH_METADATA_FIELD)
.map { row =>
{
val recordKey = row.getString(0)
val partitionPath = row.getString(1)
new org.apache.hudi.common.model.HoodieKey(recordKey, partitionPath)
}
}
.toJavaRDD
}
// Deletes all records with the provided record keys without triggering compaction.
def deleteRecords(
config: WriteClientConfig,
envConfig: EnvConfig,
dataLakeRecord: DataLakeRecord[_, _],
recordKeys: JavaRDD[HoodieKey]
)(implicit
spark: SparkSession
): DeleteStats = {
var stats = DeleteStats()
val writeClient =
buildWriteClient(config, envConfig, dataLakeRecord, spark.sparkContext)
var deleteInstant: String = ""
try {
deleteInstant = writeClient.startCommit()
val statuses: mutable.Seq[WriteStatus] =
writeClient.delete(recordKeys, deleteInstant).collect().asScala
stats = stats.copy(
totalDeleted = getTotalDeletesFromWriteStatuses(statuses),
totalPartitionsDeleted = getPartitionsFromWriteStatuses(statuses).size
)
} catch {
case t: Throwable =>
logErrorAndExit(s"Delete operation failed for instant $deleteInstant due to ", t)
} finally {
log.info(s"Finished delete operation for instant $deleteInstant")
writeClient.close()
}
stats
}
def compactReadOptimizedRecordsMatchingPredicate(
config: WriteClientConfig,
envConfig: EnvConfig,
dataLakeRecord: DataLakeRecord[_, _],
predicate: Column
)(implicit spark: SparkSession): DeleteStats = {
import spark.implicits._
var stats = DeleteStats()
if (config.scheduleCompaction) {
val partitions = query(config.tablePath, predicate, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)
.select(col(HoodieRecord.PARTITION_PATH_METADATA_FIELD))
.distinct()
.map(r => r.getString(0))
.collect()
.toList
if (partitions.isEmpty) {
log.warn(
s"Skipping compaction on table ${dataLakeRecord.tableName} as no records were found"
)
} else {
log.info(
s"Compacting ${partitions.size} partition paths in table ${dataLakeRecord.tableName}: ${partitions.take(10).mkString(",")}..."
)
val writeClient =
compactingWriteClient(config, dataLakeRecord.tableName, envConfig, partitions)
// Retry any pending (requested/inflight) compactions targeting overlapping partitions.
// This is essentially a recovery process - if a previous run failed after scheduling a compaction,
// the file groups in that compaction plan will be excluded from the new scheduled plan and their compaction won't be retried.
// The compaction execution uses locks therefore it is safe to execute the same compaction plan concurrently.
// The lake API limits GDPR deletion jobs to one at a time per env_id, therefore such conflict is unlikely (but still possible with mutations).
// todo: get stats from this compaction to aggregate into final stats
if (config.executeCompaction) {
CompactionHelper.executeCompactionsWithOverlappingPartitions(
config.tablePath,
partitions,
writeClient
)
}
try {
log.info(s"""Scheduling compaction on table ${dataLakeRecord.tableName}:
| hoodie.compaction.strategy: ${writeClient.getConfig.getCompactionStrategy}
| "hoodie.compaction.include.partitions": ${writeClient.getConfig
.getString(
"hoodie.compaction.include.partitions"
)
.take(100)}...
| hoodie.compact.schedule.inline: ${writeClient.getConfig
.scheduleInlineCompaction()}
| hoodie.compact.inline: ${writeClient.getConfig
.inlineCompactionEnabled()}
| hoodie.write.lock.provider: ${writeClient.getConfig.getLockProviderClass}
| hoodie.metadata.enable: ${writeClient.getConfig.getMetadataConfig
.enabled()}
|""".stripMargin)
val instant = writeClient.scheduleCompaction(org.apache.hudi.common.util.Option.empty())
if (instant.isPresent) {
if (config.executeCompaction) {
log.info(
s"Running compaction on table ${dataLakeRecord.tableName} at instant: ${instant.get}"
)
val metadata = writeClient.compact(instant.get).getCommitMetadata
stats = stats.copy(
compactionDeletes = metadata.get.getTotalRecordsDeleted,
compactionTotalFiles = metadata.get.getTotalLogFilesCompacted,
compactionTotalPartitions = metadata.get.fetchTotalPartitionsWritten
)
log.info(
s"Successfully completed compaction on table ${dataLakeRecord.tableName} at instant: ${instant.get}"
)
}
} else {
log.warn(
s"Unable to schedule compaction on table ${dataLakeRecord.tableName}, see Hudi logs for reason."
)
}
} catch {
case e: Throwable =>
logErrorAndExit(s"Compaction failed on table ${dataLakeRecord.tableName}!", e)
} finally {
writeClient.close()
}
}
}
stats
}
// Deletes all records matching the predicate and runs compaction afterwards.
// Deletion and compaction are only executed if there are records matching the predicate.
def deleteRecords(
config: WriteClientConfig,
envConfig: EnvConfig,
dataLakeRecord: DataLakeRecord[_, _],
predicate: Column
)(implicit
spark: SparkSession
): DeleteStats = {
var stats = DeleteStats()
val recordCount = query(config.tablePath, predicate, QUERY_TYPE_SNAPSHOT_OPT_VAL).count()
stats = stats.copy(
totalFoundBeforeDelete = recordCount
)
if (recordCount == 0) {
log.warn(
s"Skipping delete operation on table ${dataLakeRecord.tableName} as no records were found"
)
DeleteStats()
} else {
log.warn(s"Deleting $recordCount records from table ${dataLakeRecord.tableName}")
val keysToDelete = computeRecordKeys(config.tablePath, predicate)
val deleteStats = deleteRecords(config, envConfig, dataLakeRecord, keysToDelete)
stats = stats.copy(
totalDeleted = deleteStats.totalDeleted,
totalPartitionsDeleted = deleteStats.totalPartitionsDeleted
)
}
val compactionStats =
compactReadOptimizedRecordsMatchingPredicate(config, envConfig, dataLakeRecord, predicate)
stats = stats.copy(
compactionDeletes = compactionStats.compactionDeletes,
compactionTotalFiles = compactionStats.compactionTotalFiles,
compactionTotalPartitions = compactionStats.compactionTotalPartitions
)
stats
}
def buildWriteClient(
config: WriteClientConfig,
envConfig: EnvConfig,
datalakeRecord: DataLakeRecord[_, _],
sparkContext: SparkContext
): SparkRDDWriteClient[_] = {
val lockProperties = new Properties()
val lockOptionsMap =
WriterOptions.lockOptions(datalakeRecord.tableName, config.region, envConfig: EnvConfig)
lockProperties.putAll(lockOptionsMap.asJava)
val metricsProperties = new Properties()
val metricsOptionsMap = metricsOptions(config.statsDHost, envConfig, datalakeRecord.tableName)
metricsProperties.putAll(metricsOptionsMap.asJava)
val writerConfig = HoodieWriteConfig
.newBuilder()
.withCompactionConfig(
HoodieCompactionConfig
.newBuilder()
.withInlineCompaction(false)
.withScheduleInlineCompaction(false)
.withMaxNumDeltaCommitsBeforeCompaction(1)
.build()
)
.withArchivalConfig(HoodieArchivalConfig.newBuilder().withAutoArchive(false).build())
.withCleanConfig(HoodieCleanConfig.newBuilder().withAutoClean(false).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
.withLockConfig(HoodieLockConfig.newBuilder().fromProperties(lockProperties).build())
.withMetricsConfig(HoodieMetricsConfig.newBuilder().fromProperties(metricsProperties).build())
.withDeleteParallelism(config.writeParallelism)
.withPath(config.tablePath)
.forTable(datalakeRecord.tableName)
.build()
val engineContext: HoodieEngineContext = new HoodieSparkEngineContext(
JavaSparkContext.fromSparkContext(sparkContext)
)
new SparkRDDWriteClient(engineContext, writerConfig)
}
}
@ehurheap I was trying to reproduce and identify the reason for this Exception. Couldn't reproduce as was not able to run and even not able to identify exact reason. It is happening due to this custom code only.
This issue is occurring because the DynamoDBBasedLockProvider object is being utilized somewhere within an executor. When we directly pass the object, the problem arises. You might want to consider creating the object exclusively within the executor code if you can figure that out.
@ad1happy2go - thanks for the update. We will take another look.
I have an integration test that reproduces this but I'll need to extract from our codebase and repackage it so it can run standalone. I'll update here when I have that in case it helps.
Hi @ad1happy2go DynamoDBBasedLockProvider and HiveMetastoreBasedLockProvider have the same issue like https://issues.apache.org/jira/browse/HUDI-3638, task not serializable in clean action.
@ad1happy2go here is a small repo with code and instructions on how to reproduce this problem. It would be great if you could try it out and let me know if that reproduced it for you.
Sure. I will give it a try.
On Sat, Dec 23, 2023 at 5:28 AM Liz Hurley @.***> wrote:
@ad1happy2go https://github.com/ad1happy2go here is a small repo https://github.com/ehurheap/hudisupport with code and instructions on how to reproduce this problem. It would be great if you could try it out and let me know if that reproduced it for you.
— Reply to this email directly, view it on GitHub https://github.com/apache/hudi/issues/9807#issuecomment-1868139214, or unsubscribe https://github.com/notifications/unsubscribe-auth/APD55YUDRLKABLYSTC5P4XTYKYNCHAVCNFSM6AAAAAA5MZ7V26VHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMYTQNRYGEZTSMRRGQ . You are receiving this because you were mentioned.Message ID: @.***>