hudi icon indicating copy to clipboard operation
hudi copied to clipboard

[SUPPORT] NotSerializableException using SparkRDDWriteClient with OCC and DynamoDBBasedLockProvider

Open ehurheap opened this issue 1 year ago • 9 comments

Describe the problem you faced

We are trying to delete records from a MOR table with the following write configs for locking and concurrency and are running into java.io.NotSerializableException: org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider

write concurrency/locking configs:

(hoodie.write.concurrency.mode,OPTIMISTIC_CONCURRENCY_CONTROL)
(hoodie.write.lock.conflict.resolution.strategy,org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictResolutionStrategy)
(hoodie.write.lock.dynamodb.endpoint_url,http://localhost:8000)
(hoodie.write.lock.dynamodb.partition_key,users_changes-us-east-1-local)
(hoodie.write.lock.dynamodb.region,us-east-1)
(hoodie.write.lock.dynamodb.table,datalake-locks)
(hoodie.write.lock.provider,org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider)

(Note the config above is taken from an integration test where we specify a local dynamoDB but the same error happens in production dynamoDB).

delete code snippet:

val writeClient: SparkRDDWriteClient[_] = buildWriteClient(config, spark.sparkContext)
    try {
      val deleteInstant = writeClient.startCommit()
      writeClient.delete(recordKeys, deleteInstant)
...

To Reproduce

  1. Build an RDD of recordKeys to delete
  2. Build a SparkRDDWriteClient with concurrency and locking configs as specified above
  3. execute writeClient.delete(recordKeys, deleteInstant)
  4. observe java.io.NotSerializableException: org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider

Expected behavior delete is successful

Environment Description

Hudi version : 0.13.0 Spark version : 3.3.0 Hive version : n/a Hadoop version : 3.2.1 Storage (HDFS/S3/GCS..) : s3 Running on Docker? (yes/no) : no

Additional context

We are using the writeclient to delete an RDD of recordKeys because the records keys were generated using UUID keys, which cannot be deleted using the dataframe API. This was surfaced in this issue. It seems we did not test with full concurrency/locking configured at that time.

Stacktrace

org.apache.hudi.exception.HoodieUpsertException: Failed to delete for commit time 20230927230655498
org.apache.hudi.exception.HoodieUpsertException: Failed to delete for commit time 20230927230655498
        at org.apache.hudi.table.action.commit.HoodieDeleteHelper.execute(HoodieDeleteHelper.java:117)
        at org.apache.hudi.table.action.deltacommit.SparkDeleteDeltaCommitActionExecutor.execute(SparkDeleteDeltaCommitActionExecutor.java:45)
        at org.apache.hudi.table.HoodieSparkMergeOnReadTable.delete(HoodieSparkMergeOnReadTable.java:105)
        at org.apache.hudi.table.HoodieSparkMergeOnReadTable.delete(HoodieSparkMergeOnReadTable.java:80)
        at org.apache.hudi.client.SparkRDDWriteClient.delete(SparkRDDWriteClient.java:243)
        at com.heap.datalake.delete.UUIDRecordKeyDeleter$.deleteRecords(UUIDRecordKeyDeleter.scala:82)
        at com.heap.datalake.delete.UUIDRecordKeyDeleter$.deleteRecords(UUIDRecordKeyDeleter.scala:210)
        at com.heap.datalake.delete.UserPropertiesDeleter$.deleteUsers(UserPropertiesDeleter.scala:48)
        at com.heap.datalake.delete.UserPropertiesDeleter$.run(UserPropertiesDeleter.scala:23)
        at com.heap.datalake.delete.DeleteUsersApp$.$anonfun$run$2(DeleteUsersApp.scala:56)
        at scala.util.Try$.apply(Try.scala:213)
        at com.heap.datalake.delete.DeleteUsersApp$.run(DeleteUsersApp.scala:56)
        at com.heap.datalake.delete.DeleteUsersApp$.$anonfun$main$2(DeleteUsersApp.scala:38)
        at com.heap.datalake.delete.DeleteUsersApp$.$anonfun$main$2$adapted(DeleteUsersApp.scala:23)
        at scala.util.Success.foreach(Try.scala:253)
        at com.heap.datalake.delete.DeleteUsersApp$.main(DeleteUsersApp.scala:23)
        at com.heap.datalake.delete.DeleteUsersApp.main(DeleteUsersApp.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:742)
Caused by: org.apache.spark.SparkException: Task not serializable
        at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:444)
        at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:416)
        at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:163)
        at org.apache.spark.SparkContext.clean(SparkContext.scala:2493)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2268)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2294)
        at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1021)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
        at org.apache.spark.rdd.RDD.collect(RDD.scala:1020)
        at org.apache.spark.api.java.JavaRDDLike.collect(JavaRDDLike.scala:362)
        at org.apache.spark.api.java.JavaRDDLike.collect$(JavaRDDLike.scala:361)
        at org.apache.spark.api.java.AbstractJavaRDDLike.collect(JavaRDDLike.scala:45)
        at org.apache.hudi.data.HoodieJavaRDD.collectAsList(HoodieJavaRDD.java:163)
        at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.setCommitMetadata(BaseSparkCommitActionExecutor.java:283)
        at org.apache.hudi.table.action.commit.BaseCommitActionExecutor.autoCommit(BaseCommitActionExecutor.java:186)
        at org.apache.hudi.table.action.commit.BaseCommitActionExecutor.commitOnAutoCommit(BaseCommitActionExecutor.java:174)
        at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.updateIndexAndCommitIfNeeded(BaseSparkCommitActionExecutor.java:273)
        at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:178)
        at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:83)
        at org.apache.hudi.table.action.commit.HoodieDeleteHelper.execute(HoodieDeleteHelper.java:103)
        ... 21 more
Caused by: java.io.NotSerializableException: org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider
Serialization stack:
        - object not serializable (class: org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider, value: org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider@697af25)
        - field (class: org.apache.hudi.client.transaction.lock.LockManager, name: lockProvider, type: interface org.apache.hudi.common.lock.LockProvider)
        - object (class org.apache.hudi.client.transaction.lock.LockManager, org.apache.hudi.client.transaction.lock.LockManager@a109d65)
        - field (class: org.apache.hudi.client.transaction.TransactionManager, name: lockManager, type: class org.apache.hudi.client.transaction.lock.LockManager)
        - object (class org.apache.hudi.client.transaction.TransactionManager, org.apache.hudi.client.transaction.TransactionManager@157ce6e9)
        - field (class: org.apache.hudi.table.action.commit.BaseCommitActionExecutor, name: txnManager, type: class org.apache.hudi.client.transaction.TransactionManager)
        - object (class org.apache.hudi.table.action.deltacommit.SparkDeleteDeltaCommitActionExecutor, org.apache.hudi.table.action.deltacommit.SparkDeleteDeltaCommitActionExecutor@d63f085)
        - element of array (index: 0)
        - array (class [Ljava.lang.Object;, size 2)
        - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
        - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor, functionalInterfaceMethod=org/apache/spark/api/java/functi
on/Function2.call:(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeSpecial org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1:(Lorg/apache/sp
ark/Partitioner;Ljava/lang/Integer;Ljava/util/Iterator;)Ljava/util/Iterator;, instantiatedMethodType=(Ljava/lang/Integer;Ljava/util/Iterator;)Ljava/util/Iterator;, numCaptured=2])
        - writeReplace data (class: java.lang.invoke.SerializedLambda)
        - object (class org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor$$Lambda$5283/1488307125, org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor$$Lambda$5283/1488307125@467af2d8)
        - element of array (index: 0)
        - array (class [Ljava.lang.Object;, size 1)
        - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
        - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=interface org.apache.spark.api.java.JavaRDDLike, functionalInterfaceMethod=scala/Function2.apply:(Ljava/lang/Object;Ljava/lang/Obje
ct;)Ljava/lang/Object;, implementation=invokeStatic org/apache/spark/api/java/JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted:(Lorg/apache/spark/api/java/function/Function2;Ljava/lang/Object;Lscala/collection/Iterato
r;)Lscala/collection/Iterator;, instantiatedMethodType=(Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;, numCaptured=1])
        - writeReplace data (class: java.lang.invoke.SerializedLambda)
        - object (class org.apache.spark.api.java.JavaRDDLike$$Lambda$5284/280417748, org.apache.spark.api.java.JavaRDDLike$$Lambda$5284/280417748@1c7adacb)
        - element of array (index: 0)
        - array (class [Ljava.lang.Object;, size 1)
        - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
        - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class org.apache.spark.rdd.RDD, functionalInterfaceMethod=scala/Function3.apply:(Ljava/lang/Object;Ljava/lang/Object;Ljava/lang/Obj
ect;)Ljava/lang/Object;, implementation=invokeStatic org/apache/spark/rdd/RDD.$anonfun$mapPartitionsWithIndex$2$adapted:(Lscala/Function2;Lorg/apache/spark/TaskContext;Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/c
ollection/Iterator;, instantiatedMethodType=(Lorg/apache/spark/TaskContext;Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;, numCaptured=1])
        - writeReplace data (class: java.lang.invoke.SerializedLambda)
        - object (class org.apache.spark.rdd.RDD$$Lambda$4810/1052938329, org.apache.spark.rdd.RDD$$Lambda$4810/1052938329@3c31aa92)
        - field (class: org.apache.spark.rdd.MapPartitionsRDD, name: f, type: interface scala.Function3)
        - object (class org.apache.spark.rdd.MapPartitionsRDD, MapPartitionsRDD[146] at mapPartitionsWithIndex at BaseSparkCommitActionExecutor.java:249)
        - field (class: org.apache.spark.NarrowDependency, name: _rdd, type: class org.apache.spark.rdd.RDD)
        - object (class org.apache.spark.OneToOneDependency, org.apache.spark.OneToOneDependency@373e82e8)
        -         - writeObject data (class: scala.collection.immutable.List$SerializationProxy)
        - object (class scala.collection.immutable.List$SerializationProxy, scala.collection.immutable.List$SerializationProxy@44652f9f)
        - writeReplace data (class: scala.collection.immutable.List$SerializationProxy)
        - object (class scala.collection.immutable.$colon$colon, List(org.apache.spark.OneToOneDependency@373e82e8))
        - field (class: org.apache.spark.rdd.RDD, name: dependencies_, type: interface scala.collection.Seq)
        - object (class org.apache.spark.rdd.MapPartitionsRDD, MapPartitionsRDD[147] at flatMap at BaseSparkCommitActionExecutor.java:255)
        - field (class: org.apache.spark.NarrowDependency, name: _rdd, type: class org.apache.spark.rdd.RDD)
        - object (class org.apache.spark.OneToOneDependency, org.apache.spark.OneToOneDependency@66f562d4)
        - writeObject data (class: scala.collection.immutable.List$SerializationProxy)
        - object (class scala.collection.immutable.List$SerializationProxy, scala.collection.immutable.List$SerializationProxy@d993b51)
        - writeReplace data (class: scala.collection.immutable.List$SerializationProxy)
        - object (class scala.collection.immutable.$colon$colon, List(org.apache.spark.OneToOneDependency@66f562d4))
        - field (class: org.apache.spark.rdd.RDD, name: dependencies_, type: interface scala.collection.Seq)
        - object (class org.apache.spark.rdd.MapPartitionsRDD, MapPartitionsRDD[148] at map at HoodieJavaRDD.java:111)
        - element of array (index: 0)
        - array (class [Ljava.lang.Object;, size 1)
        - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
        - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class org.apache.spark.rdd.RDD, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementat
ion=invokeStatic org/apache/spark/rdd/RDD.$anonfun$collect$2:(Lorg/apache/spark/rdd/RDD;Lscala/collection/Iterator;)Ljava/lang/Object;, instantiatedMethodType=(Lscala/collection/Iterator;)Ljava/lang/Object;, numCaptured=1]
)
        - writeReplace data (class: java.lang.invoke.SerializedLambda)
        - object (class org.apache.spark.rdd.RDD$$Lambda$3667/1073189054, org.apache.spark.rdd.RDD$$Lambda$3667/1073189054@770c17a4)
        at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
        at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:49)
        at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:115)
        at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:441)
        ... 42 more
23/09/27 23:07:54 INFO SparkContext: Invoking stop() from shutdown hook

ehurheap avatar Sep 29 '23 18:09 ehurheap

@ehurheap Did you tried the same lock configuration with a normal insert on test table to ensure configurations are good?

ad1happy2go avatar Oct 03 '23 08:10 ad1happy2go

Yes. I can write a dataframe to the same table, for example:

    data.write
      .format("org.apache.hudi.Spark32PlusDefaultSource")
      .options(writeWithLocking)
      .mode("append")
      .save(tablePath)

where writeWithLocking options are:

(hoodie.bulkinsert.shuffle.parallelism,2)
(hoodie.bulkinsert.sort.mode,NONE)
(hoodie.clean.async,false)
(hoodie.clean.automatic,false)
(hoodie.cleaner.policy.failed.writes,LAZY)
(hoodie.combine.before.insert,false)
(hoodie.compact.inline,false)
(hoodie.compact.schedule.inline,false)
(hoodie.datasource.compaction.async.enable,false)
(hoodie.datasource.write.hive_style_partitioning,true)
(hoodie.datasource.write.keygenerator.class,org.apache.spark.sql.hudi.command.UuidKeyGenerator)
(hoodie.datasource.write.operation,bulk_insert)
(hoodie.datasource.write.partitionpath.field,env_id,week)
(hoodie.datasource.write.precombine.field,schematized_at)
(hoodie.datasource.write.recordkey.field,env_id,user_id)
(hoodie.datasource.write.row.writer.enable,false)
(hoodie.datasource.write.table.type,MERGE_ON_READ)
(hoodie.metadata.enable,false)
(hoodie.table.name,users_changes)
(hoodie.write.concurrency.mode,OPTIMISTIC_CONCURRENCY_CONTROL)
(hoodie.write.lock.dynamodb.endpoint_url,http://localhost:8000)
(hoodie.write.lock.dynamodb.partition_key,users_changes-us-east-1-local)
(hoodie.write.lock.dynamodb.region,us-east-1)
(hoodie.write.lock.dynamodb.table,datalake-locks)
(hoodie.write.lock.provider,org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider)

These locking configs are also in our production ingestion which writes to hudi using spark structured streaming without error.

ehurheap avatar Oct 03 '23 17:10 ehurheap

@ehurheap Sorry for the delay here. Is it possible to provide full implementation of UUIDRecordKeyDeleter.scala

ad1happy2go avatar Oct 10 '23 09:10 ad1happy2go

object UUIDRecordKeyDeleter {
  private val log = loggerForClass(UUIDRecordKeyDeleter.getClass)

  def query(tablePath: String, predicate: Column, queryType: String)(implicit
    spark: SparkSession
  ): DataFrame = {
    spark.read
      .format("hudi")
      .option(QUERY_TYPE.key(), queryType)
      .option(HoodieMetadataConfig.ENABLE.key(), "false")
      .load(tablePath)
      .where(predicate)
  }

  // Computes record keys of the records matching the predicate
  def computeRecordKeys(
    tablePath: String,
    predicate: Column
  )(implicit
    spark: SparkSession
  ): JavaRDD[HoodieKey] = {
    implicit val hoodieKeyEncoder: Encoder[HoodieKey] =
      Encoders.bean(classOf[org.apache.hudi.common.model.HoodieKey])
    query(tablePath, predicate, QUERY_TYPE_SNAPSHOT_OPT_VAL)
      .select(HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieRecord.PARTITION_PATH_METADATA_FIELD)
      .map { row =>
        {
          val recordKey = row.getString(0)
          val partitionPath = row.getString(1)
          new org.apache.hudi.common.model.HoodieKey(recordKey, partitionPath)
        }
      }
      .toJavaRDD
  }

  // Deletes all records with the provided record keys without triggering compaction.
  def deleteRecords(
    config: WriteClientConfig,
    envConfig: EnvConfig,
    dataLakeRecord: DataLakeRecord[_, _],
    recordKeys: JavaRDD[HoodieKey]
  )(implicit
    spark: SparkSession
  ): DeleteStats = {
    var stats = DeleteStats()
    val writeClient =
      buildWriteClient(config, envConfig, dataLakeRecord, spark.sparkContext)
    var deleteInstant: String = ""

    try {
      deleteInstant = writeClient.startCommit()
      val statuses: mutable.Seq[WriteStatus] =
        writeClient.delete(recordKeys, deleteInstant).collect().asScala
      stats = stats.copy(
        totalDeleted = getTotalDeletesFromWriteStatuses(statuses),
        totalPartitionsDeleted = getPartitionsFromWriteStatuses(statuses).size
      )
    } catch {
      case t: Throwable =>
        logErrorAndExit(s"Delete operation failed for instant $deleteInstant due to ", t)
    } finally {
      log.info(s"Finished delete operation for instant $deleteInstant")
      writeClient.close()
    }
    stats
  }

  def compactReadOptimizedRecordsMatchingPredicate(
    config: WriteClientConfig,
    envConfig: EnvConfig,
    dataLakeRecord: DataLakeRecord[_, _],
    predicate: Column
  )(implicit spark: SparkSession): DeleteStats = {
    import spark.implicits._
    var stats = DeleteStats()
    if (config.scheduleCompaction) {
      val partitions = query(config.tablePath, predicate, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)
        .select(col(HoodieRecord.PARTITION_PATH_METADATA_FIELD))
        .distinct()
        .map(r => r.getString(0))
        .collect()
        .toList

      if (partitions.isEmpty) {
        log.warn(
          s"Skipping compaction on table ${dataLakeRecord.tableName} as no records were found"
        )
      } else {
        log.info(
          s"Compacting ${partitions.size} partition paths in table ${dataLakeRecord.tableName}: ${partitions.take(10).mkString(",")}..."
        )

        val writeClient =
          compactingWriteClient(config, dataLakeRecord.tableName, envConfig, partitions)

        // Retry any pending (requested/inflight) compactions targeting overlapping partitions.
        // This is essentially a recovery process - if a previous run failed after scheduling a compaction,
        // the file groups in that compaction plan will be excluded from the new scheduled plan and their compaction won't be retried.
        // The compaction execution uses locks therefore it is safe to execute the same compaction plan concurrently.
        // The lake API limits GDPR deletion jobs to one at a time per env_id, therefore such conflict is unlikely (but still possible with mutations).
        // todo: get stats from this compaction to aggregate into final stats
        if (config.executeCompaction) {
          CompactionHelper.executeCompactionsWithOverlappingPartitions(
            config.tablePath,
            partitions,
            writeClient
          )
        }

        try {
          log.info(s"""Scheduling compaction on table ${dataLakeRecord.tableName}:
               |                 hoodie.compaction.strategy: ${writeClient.getConfig.getCompactionStrategy}
               |     "hoodie.compaction.include.partitions": ${writeClient.getConfig
                       .getString(
                         "hoodie.compaction.include.partitions"
                       )
                       .take(100)}...
               |             hoodie.compact.schedule.inline: ${writeClient.getConfig
                       .scheduleInlineCompaction()}
               |                      hoodie.compact.inline: ${writeClient.getConfig
                       .inlineCompactionEnabled()}
               |                 hoodie.write.lock.provider: ${writeClient.getConfig.getLockProviderClass}
               |                     hoodie.metadata.enable: ${writeClient.getConfig.getMetadataConfig
                       .enabled()}
               |""".stripMargin)
          val instant = writeClient.scheduleCompaction(org.apache.hudi.common.util.Option.empty())
          if (instant.isPresent) {
            if (config.executeCompaction) {
              log.info(
                s"Running compaction on table ${dataLakeRecord.tableName} at instant: ${instant.get}"
              )
              val metadata = writeClient.compact(instant.get).getCommitMetadata
              stats = stats.copy(
                compactionDeletes = metadata.get.getTotalRecordsDeleted,
                compactionTotalFiles = metadata.get.getTotalLogFilesCompacted,
                compactionTotalPartitions = metadata.get.fetchTotalPartitionsWritten
              )
              log.info(
                s"Successfully completed compaction on table ${dataLakeRecord.tableName} at instant: ${instant.get}"
              )
            }
          } else {
            log.warn(
              s"Unable to schedule compaction on table ${dataLakeRecord.tableName}, see Hudi logs for reason."
            )
          }
        } catch {
          case e: Throwable =>
            logErrorAndExit(s"Compaction failed on table ${dataLakeRecord.tableName}!", e)
        } finally {
          writeClient.close()
        }
      }
    }
    stats
  }

  // Deletes all records matching the predicate and runs compaction afterwards.
  // Deletion and compaction are only executed if there are records matching the predicate.
  def deleteRecords(
    config: WriteClientConfig,
    envConfig: EnvConfig,
    dataLakeRecord: DataLakeRecord[_, _],
    predicate: Column
  )(implicit
    spark: SparkSession
  ): DeleteStats = {
    var stats = DeleteStats()
    val recordCount = query(config.tablePath, predicate, QUERY_TYPE_SNAPSHOT_OPT_VAL).count()
    stats = stats.copy(
      totalFoundBeforeDelete = recordCount
    )
    if (recordCount == 0) {
      log.warn(
        s"Skipping delete operation on table ${dataLakeRecord.tableName} as no records were found"
      )
      DeleteStats()
    } else {
      log.warn(s"Deleting $recordCount records from table ${dataLakeRecord.tableName}")
      val keysToDelete = computeRecordKeys(config.tablePath, predicate)
      val deleteStats = deleteRecords(config, envConfig, dataLakeRecord, keysToDelete)
      stats = stats.copy(
        totalDeleted = deleteStats.totalDeleted,
        totalPartitionsDeleted = deleteStats.totalPartitionsDeleted
      )
    }
    val compactionStats =
      compactReadOptimizedRecordsMatchingPredicate(config, envConfig, dataLakeRecord, predicate)
    stats = stats.copy(
      compactionDeletes = compactionStats.compactionDeletes,
      compactionTotalFiles = compactionStats.compactionTotalFiles,
      compactionTotalPartitions = compactionStats.compactionTotalPartitions
    )
    stats
  }

  def buildWriteClient(
    config: WriteClientConfig,
    envConfig: EnvConfig,
    datalakeRecord: DataLakeRecord[_, _],
    sparkContext: SparkContext
  ): SparkRDDWriteClient[_] = {
    val lockProperties = new Properties()
    val lockOptionsMap =
      WriterOptions.lockOptions(datalakeRecord.tableName, config.region, envConfig: EnvConfig)
    lockProperties.putAll(lockOptionsMap.asJava)

    val metricsProperties = new Properties()
    val metricsOptionsMap = metricsOptions(config.statsDHost, envConfig, datalakeRecord.tableName)
    metricsProperties.putAll(metricsOptionsMap.asJava)

    val writerConfig = HoodieWriteConfig
      .newBuilder()
      .withCompactionConfig(
        HoodieCompactionConfig
          .newBuilder()
          .withInlineCompaction(false)
          .withScheduleInlineCompaction(false)
          .withMaxNumDeltaCommitsBeforeCompaction(1)
          .build()
      )
      .withArchivalConfig(HoodieArchivalConfig.newBuilder().withAutoArchive(false).build())
      .withCleanConfig(HoodieCleanConfig.newBuilder().withAutoClean(false).build())
      .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
      .withLockConfig(HoodieLockConfig.newBuilder().fromProperties(lockProperties).build())
      .withMetricsConfig(HoodieMetricsConfig.newBuilder().fromProperties(metricsProperties).build())
      .withDeleteParallelism(config.writeParallelism)
      .withPath(config.tablePath)
      .forTable(datalakeRecord.tableName)
      .build()
    val engineContext: HoodieEngineContext = new HoodieSparkEngineContext(
      JavaSparkContext.fromSparkContext(sparkContext)
    )
    new SparkRDDWriteClient(engineContext, writerConfig)
  }
  
}

ehurheap avatar Oct 11 '23 00:10 ehurheap

@ehurheap I was trying to reproduce and identify the reason for this Exception. Couldn't reproduce as was not able to run and even not able to identify exact reason. It is happening due to this custom code only.

This issue is occurring because the DynamoDBBasedLockProvider object is being utilized somewhere within an executor. When we directly pass the object, the problem arises. You might want to consider creating the object exclusively within the executor code if you can figure that out.

ad1happy2go avatar Oct 24 '23 08:10 ad1happy2go

@ad1happy2go - thanks for the update. We will take another look.

I have an integration test that reproduces this but I'll need to extract from our codebase and repackage it so it can run standalone. I'll update here when I have that in case it helps.

ehurheap avatar Oct 24 '23 14:10 ehurheap

Hi @ad1happy2go DynamoDBBasedLockProvider and HiveMetastoreBasedLockProvider have the same issue like https://issues.apache.org/jira/browse/HUDI-3638, task not serializable in clean action.

chym1303 avatar Nov 16 '23 07:11 chym1303

@ad1happy2go here is a small repo with code and instructions on how to reproduce this problem. It would be great if you could try it out and let me know if that reproduced it for you.

ehurheap avatar Dec 22 '23 23:12 ehurheap

Sure. I will give it a try.

On Sat, Dec 23, 2023 at 5:28 AM Liz Hurley @.***> wrote:

@ad1happy2go https://github.com/ad1happy2go here is a small repo https://github.com/ehurheap/hudisupport with code and instructions on how to reproduce this problem. It would be great if you could try it out and let me know if that reproduced it for you.

— Reply to this email directly, view it on GitHub https://github.com/apache/hudi/issues/9807#issuecomment-1868139214, or unsubscribe https://github.com/notifications/unsubscribe-auth/APD55YUDRLKABLYSTC5P4XTYKYNCHAVCNFSM6AAAAAA5MZ7V26VHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMYTQNRYGEZTSMRRGQ . You are receiving this because you were mentioned.Message ID: @.***>

ad1happy2go avatar Dec 23 '23 01:12 ad1happy2go