hudi icon indicating copy to clipboard operation
hudi copied to clipboard

[BUG] S3 Deltastreamer: Block has already been inflated

Open dyang108 opened this issue 2 years ago • 25 comments

Describe the problem you faced

Deltastreamer with write output to S3 exits unexpectedly when running in continuous mode.

To Reproduce

Steps to reproduce the behavior: I ran the following:

/etc/spark/bin/spark-submit --conf -Dconfig.file=/service.conf,spark.executor.extraJavaOptions=-Dlog4j.debug=true --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer --jars /etc/spark/work-dir/* /etc/spark/work-dir/hudi-utilities-bundle_2.11-0.12.0.jar --props /mnt/mesos/sandbox/kafka-source.properties --schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider --source-class org.apache.hudi.utilities.sources.AvroKafkaSource --target-base-path "s3a://strava.scratch/tmp/derick/hudi" --target-table "aligned_activities" --op "UPSERT" --source-ordering-field "ts" --table-type "COPY_ON_WRITE" --source-limit 100 --continuous

the /etc/spark/work-dir/ looks like this: aws-java-sdk-bundle-1.12.283.jar hadoop-aws-2.6.5.jar hudi-utilities-bundle_2.11-0.12.0.jar scala-library-2.11.12.jar spark-streaming-kafka-0-10_2.11-2.4.8.jar

Expected behavior

I don't expect there to be issues on compaction here.

Environment Description

  • Hudi version : 0.12.0 (also tried 0.11.1)

  • Spark version : 2.4.8

  • Hive version :

  • Hadoop version : 2.6.5

  • Storage (HDFS/S3/GCS..) : S3

  • Running on Docker? (yes/no) : Yes, docker on Mesos

I'm reading from an Avro kafka topic

Additional context

Add any other context about the problem here.

Reading Avro record from Kafka

hoodie.datasource.write.recordkey.field=activityId
auto.offset.reset=latest

Stacktrace

22/08/17 23:07:26 ERROR HoodieAsyncService: Service shutdown with error
java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieUpsertException: Failed to upsert for commit time 20220817230714888
    at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
    at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
    at org.apache.hudi.async.HoodieAsyncService.waitForShutdown(HoodieAsyncService.java:103)
    at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.lambda$sync$1(HoodieDeltaStreamer.java:190)
    at org.apache.hudi.common.util.Option.ifPresent(Option.java:97)
    at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:187)
    at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:557)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
    at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:849)
    at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167)
    at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195)
    at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
    at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:924)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:933)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.apache.hudi.exception.HoodieUpsertException: Failed to upsert for commit time 20220817230714888
    at org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:64)
    at org.apache.hudi.table.action.commit.SparkUpsertCommitActionExecutor.execute(SparkUpsertCommitActionExecutor.java:45)
    at org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:113)
    at org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:97)
    at org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:155)
    at org.apache.hudi.utilities.deltastreamer.DeltaSync.writeToSink(DeltaSync.java:588)
    at org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:335)
    at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.lambda$startService$0(HoodieDeltaStreamer.java:687)
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 7.0 failed 1 times, most recent failure: Lost task 0.0 in stage 7.0 (TID 7, localhost, executor driver): org.apache.hudi.exception.HoodieMetadataException: Failed to retrieve files in partition s3a://strava.scratch/tmp/derick/hudi/__HIVE_DEFAULT_PARTITION__ from metadata
    at org.apache.hudi.metadata.BaseTableMetadata.getAllFilesInPartition(BaseTableMetadata.java:137)
    at org.apache.hudi.metadata.HoodieMetadataFileSystemView.listPartition(HoodieMetadataFileSystemView.java:65)
    at org.apache.hudi.common.table.view.AbstractTableFileSystemView.lambda$ensurePartitionLoadedCorrectly$9(AbstractTableFileSystemView.java:305)
    at java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660)
    at org.apache.hudi.common.table.view.AbstractTableFileSystemView.ensurePartitionLoadedCorrectly(AbstractTableFileSystemView.java:296)
    at org.apache.hudi.common.table.view.AbstractTableFileSystemView.getLatestBaseFilesBeforeOrOn(AbstractTableFileSystemView.java:517)
    at org.apache.hudi.common.table.view.PriorityBasedFileSystemView.execute(PriorityBasedFileSystemView.java:103)
    at org.apache.hudi.common.table.view.PriorityBasedFileSystemView.getLatestBaseFilesBeforeOrOn(PriorityBasedFileSystemView.java:144)
    at org.apache.hudi.index.HoodieIndexUtils.getLatestBaseFilesForPartition(HoodieIndexUtils.java:69)
    at org.apache.hudi.index.HoodieIndexUtils.lambda$getLatestBaseFilesForAllPartitions$ff6885d8$1(HoodieIndexUtils.java:89)
    at org.apache.hudi.client.common.HoodieSparkEngineContext.lambda$flatMap$7d470b86$1(HoodieSparkEngineContext.java:137)
    at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:125)
    at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:125)
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
    at scala.collection.Iterator$class.foreach(Iterator.scala:891)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
    at scala.collection.AbstractIterator.to(Iterator.scala:1334)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1334)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1334)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.hudi.exception.HoodieException: Exception when reading log file 
    at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternal(AbstractHoodieLogRecordReader.java:352)
    at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scan(AbstractHoodieLogRecordReader.java:192)
    at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.performScan(HoodieMergedLogRecordScanner.java:110)
    at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:103)
    at org.apache.hudi.metadata.HoodieMetadataMergedLogRecordReader.<init>(HoodieMetadataMergedLogRecordReader.java:63)
    at org.apache.hudi.metadata.HoodieMetadataMergedLogRecordReader.<init>(HoodieMetadataMergedLogRecordReader.java:51)
    at org.apache.hudi.metadata.HoodieMetadataMergedLogRecordReader$Builder.build(HoodieMetadataMergedLogRecordReader.java:230)
    at org.apache.hudi.metadata.HoodieBackedTableMetadata.getLogRecordScanner(HoodieBackedTableMetadata.java:508)
    at org.apache.hudi.metadata.HoodieBackedTableMetadata.getLogRecordScanner(HoodieBackedTableMetadata.java:470)
    at org.apache.hudi.metadata.HoodieBackedTableMetadata.openReaders(HoodieBackedTableMetadata.java:416)
    at org.apache.hudi.metadata.HoodieBackedTableMetadata.lambda$getOrCreateReaders$11(HoodieBackedTableMetadata.java:402)
    at java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660)
    at org.apache.hudi.metadata.HoodieBackedTableMetadata.getOrCreateReaders(HoodieBackedTableMetadata.java:402)
    at org.apache.hudi.metadata.HoodieBackedTableMetadata.lambda$getRecordsByKeys$1(HoodieBackedTableMetadata.java:211)
    at java.util.HashMap.forEach(HashMap.java:1290)
    at org.apache.hudi.metadata.HoodieBackedTableMetadata.getRecordsByKeys(HoodieBackedTableMetadata.java:209)
    at org.apache.hudi.metadata.HoodieBackedTableMetadata.getRecordByKey(HoodieBackedTableMetadata.java:141)
    at org.apache.hudi.metadata.BaseTableMetadata.fetchAllFilesInPartition(BaseTableMetadata.java:312)
    at org.apache.hudi.metadata.BaseTableMetadata.getAllFilesInPartition(BaseTableMetadata.java:135)
    ... 37 more
Caused by: java.lang.IllegalStateException: Block has already been inflated
    at org.apache.hudi.common.util.ValidationUtils.checkState(ValidationUtils.java:67)
    at org.apache.hudi.common.table.log.block.HoodieLogBlock.inflate(HoodieLogBlock.java:267)
    at org.apache.hudi.common.table.log.block.HoodieLogBlock.inflate(HoodieLogBlock.java:278)
    at org.apache.hudi.common.table.log.block.HoodieDeleteBlock.getRecordsToDelete(HoodieDeleteBlock.java:83)
    at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.processQueuedBlocksForInstant(AbstractHoodieLogRecordReader.java:473)
    at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternal(AbstractHoodieLogRecordReader.java:343)
    ... 55 more

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
    at org.apache.spark.api.java.JavaRDDLike$class.collect(JavaRDDLike.scala:361)
    at org.apache.spark.api.java.AbstractJavaRDDLike.collect(JavaRDDLike.scala:45)
    at org.apache.hudi.client.common.HoodieSparkEngineContext.flatMap(HoodieSparkEngineContext.java:137)
    at org.apache.hudi.index.HoodieIndexUtils.getLatestBaseFilesForAllPartitions(HoodieIndexUtils.java:87)
    at org.apache.hudi.index.simple.HoodieSimpleIndex.fetchRecordLocationsForAffectedPartitions(HoodieSimpleIndex.java:144)
    at org.apache.hudi.index.simple.HoodieSimpleIndex.tagLocationInternal(HoodieSimpleIndex.java:113)
    at org.apache.hudi.index.simple.HoodieSimpleIndex.tagLocation(HoodieSimpleIndex.java:91)
    at org.apache.hudi.table.action.commit.HoodieWriteHelper.tag(HoodieWriteHelper.java:49)
    at org.apache.hudi.table.action.commit.HoodieWriteHelper.tag(HoodieWriteHelper.java:32)
    at org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:53)
    ... 11 more
Caused by: org.apache.hudi.exception.HoodieMetadataException: Failed to retrieve files in partition s3a://strava.scratch/tmp/derick/hudi/__HIVE_DEFAULT_PARTITION__ from metadata
    at org.apache.hudi.metadata.BaseTableMetadata.getAllFilesInPartition(BaseTableMetadata.java:137)
    at org.apache.hudi.metadata.HoodieMetadataFileSystemView.listPartition(HoodieMetadataFileSystemView.java:65)
    at org.apache.hudi.common.table.view.AbstractTableFileSystemView.lambda$ensurePartitionLoadedCorrectly$9(AbstractTableFileSystemView.java:305)
    at java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660)
    at org.apache.hudi.common.table.view.AbstractTableFileSystemView.ensurePartitionLoadedCorrectly(AbstractTableFileSystemView.java:296)
    at org.apache.hudi.common.table.view.AbstractTableFileSystemView.getLatestBaseFilesBeforeOrOn(AbstractTableFileSystemView.java:517)
    at org.apache.hudi.common.table.view.PriorityBasedFileSystemView.execute(PriorityBasedFileSystemView.java:103)
    at org.apache.hudi.common.table.view.PriorityBasedFileSystemView.getLatestBaseFilesBeforeOrOn(PriorityBasedFileSystemView.java:144)
    at org.apache.hudi.index.HoodieIndexUtils.getLatestBaseFilesForPartition(HoodieIndexUtils.java:69)
    at org.apache.hudi.index.HoodieIndexUtils.lambda$getLatestBaseFilesForAllPartitions$ff6885d8$1(HoodieIndexUtils.java:89)
    at org.apache.hudi.client.common.HoodieSparkEngineContext.lambda$flatMap$7d470b86$1(HoodieSparkEngineContext.java:137)
    at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:125)
    at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:125)
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
    at scala.collection.Iterator$class.foreach(Iterator.scala:891)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
    at scala.collection.AbstractIterator.to(Iterator.scala:1334)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1334)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1334)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    ... 3 more
Caused by: org.apache.hudi.exception.HoodieException: Exception when reading log file 
    at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternal(AbstractHoodieLogRecordReader.java:352)
    at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scan(AbstractHoodieLogRecordReader.java:192)
    at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.performScan(HoodieMergedLogRecordScanner.java:110)
    at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:103)
    at org.apache.hudi.metadata.HoodieMetadataMergedLogRecordReader.<init>(HoodieMetadataMergedLogRecordReader.java:63)
    at org.apache.hudi.metadata.HoodieMetadataMergedLogRecordReader.<init>(HoodieMetadataMergedLogRecordReader.java:51)
    at org.apache.hudi.metadata.HoodieMetadataMergedLogRecordReader$Builder.build(HoodieMetadataMergedLogRecordReader.java:230)
    at org.apache.hudi.metadata.HoodieBackedTableMetadata.getLogRecordScanner(HoodieBackedTableMetadata.java:508)
    at org.apache.hudi.metadata.HoodieBackedTableMetadata.getLogRecordScanner(HoodieBackedTableMetadata.java:470)
    at org.apache.hudi.metadata.HoodieBackedTableMetadata.openReaders(HoodieBackedTableMetadata.java:416)
    at org.apache.hudi.metadata.HoodieBackedTableMetadata.lambda$getOrCreateReaders$11(HoodieBackedTableMetadata.java:402)
    at java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660)
    at org.apache.hudi.metadata.HoodieBackedTableMetadata.getOrCreateReaders(HoodieBackedTableMetadata.java:402)
    at org.apache.hudi.metadata.HoodieBackedTableMetadata.lambda$getRecordsByKeys$1(HoodieBackedTableMetadata.java:211)
    at java.util.HashMap.forEach(HashMap.java:1290)
    at org.apache.hudi.metadata.HoodieBackedTableMetadata.getRecordsByKeys(HoodieBackedTableMetadata.java:209)
    at org.apache.hudi.metadata.HoodieBackedTableMetadata.getRecordByKey(HoodieBackedTableMetadata.java:141)
    at org.apache.hudi.metadata.BaseTableMetadata.fetchAllFilesInPartition(BaseTableMetadata.java:312)
    at org.apache.hudi.metadata.BaseTableMetadata.getAllFilesInPartition(BaseTableMetadata.java:135)
    ... 37 more
Caused by: java.lang.IllegalStateException: Block has already been inflated
    at org.apache.hudi.common.util.ValidationUtils.checkState(ValidationUtils.java:67)
    at org.apache.hudi.common.table.log.block.HoodieLogBlock.inflate(HoodieLogBlock.java:267)
    at org.apache.hudi.common.table.log.block.HoodieLogBlock.inflate(HoodieLogBlock.java:278)
    at org.apache.hudi.common.table.log.block.HoodieDeleteBlock.getRecordsToDelete(HoodieDeleteBlock.java:83)
    at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.processQueuedBlocksForInstant(AbstractHoodieLogRecordReader.java:473)
    at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternal(AbstractHoodieLogRecordReader.java:343)
    ... 55 more
22/08/17 23:07:26 INFO DeltaSync: Shutting down embedded timeline server
22/08/17 23:07:26 INFO EmbeddedTimelineService: Closing Timeline server
22/08/17 23:07:26 INFO TimelineService: Closing Timeline Service
22/08/17 23:07:26 INFO Javalin: Stopping Javalin ...
22/08/17 23:07:26 INFO SparkUI: Stopped Spark web UI at http://d16171598c10:8090
22/08/17 23:07:26 ERROR Javalin: Javalin failed to stop gracefully
java.lang.InterruptedException
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1326)
    at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:277)
    at org.apache.hudi.org.eclipse.jetty.server.AbstractConnector.doStop(AbstractConnector.java:333)
    at org.apache.hudi.org.eclipse.jetty.server.AbstractNetworkConnector.doStop(AbstractNetworkConnector.java:88)
    at org.apache.hudi.org.eclipse.jetty.server.ServerConnector.doStop(ServerConnector.java:248)
    at org.apache.hudi.org.eclipse.jetty.util.component.AbstractLifeCycle.stop(AbstractLifeCycle.java:89)
    at org.apache.hudi.org.eclipse.jetty.server.Server.doStop(Server.java:450)
    at org.apache.hudi.org.eclipse.jetty.util.component.AbstractLifeCycle.stop(AbstractLifeCycle.java:89)
    at io.javalin.Javalin.stop(Javalin.java:195)
    at org.apache.hudi.timeline.service.TimelineService.close(TimelineService.java:325)
    at org.apache.hudi.client.embedded.EmbeddedTimelineService.stop(EmbeddedTimelineService.java:141)
    at org.apache.hudi.utilities.deltastreamer.DeltaSync.close(DeltaSync.java:905)
    at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.close(HoodieDeltaStreamer.java:831)
    at org.apache.hudi.common.util.Option.ifPresent(Option.java:97)
    at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.onDeltaSyncShutdown(HoodieDeltaStreamer.java:223)
    at org.apache.hudi.async.HoodieAsyncService.lambda$shutdownCallback$0(HoodieAsyncService.java:171)
    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
22/08/17 23:07:26 INFO Javalin: Javalin has stopped
22/08/17 23:07:26 INFO TimelineService: Closed Timeline Service
22/08/17 23:07:26 INFO EmbeddedTimelineService: Closed Timeline server
22/08/17 23:07:26 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
22/08/17 23:07:26 INFO MemoryStore: MemoryStore cleared
22/08/17 23:07:26 INFO BlockManager: BlockManager stopped
22/08/17 23:07:26 INFO BlockManagerMaster: BlockManagerMaster stopped
22/08/17 23:07:26 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
22/08/17 23:07:26 INFO SparkContext: Successfully stopped SparkContext

dyang108 avatar Aug 17 '22 23:08 dyang108

@yihua can you take a look, seems like a metadata related issue?

rmahindra123 avatar Aug 23 '22 08:08 rmahindra123

Update: I got it working on an older version of Hudi 0.10.1, so seems like a regression

dyang108 avatar Aug 23 '22 21:08 dyang108

@dyang108 : is this happening infrequently? or your pipeline is just stuck. We have unit tests, integration tests for metadata table and we haven't this this issue yet. trying to gauge whats diff in your env or set up. looks like we are just trying to read records from metadata table. nothing fancy.

nsivabalan avatar Sep 23 '22 22:09 nsivabalan

This happened consistently with the command above every time i ran on Hudi version : 0.12.0 (also tried 0.11.1)

The pipeline failed and exited when I saw this issue.

dyang108 avatar Sep 23 '22 22:09 dyang108

We have the same stacktrace when running on hudi version 0.11.0, spark 3.2.1, EMR 6.7. We have metadata service enabled and our Spark Streaming Query fails each time. This is a COW table

kasured avatar Sep 24 '22 01:09 kasured

@nsivabalan What might be a general workaround in that situation to unblock the processing? Of course it depends on the root cause. However will deleting and recreating metadata from hudi-cli help ? One other option might be to disable metadata on the current table and proceed.

kasured avatar Sep 24 '22 03:09 kasured

got it. did you mean, you are using EMR's spark or oss spark? I understand its EMR cluster.

nsivabalan avatar Sep 26 '22 19:09 nsivabalan

@kasured to unblock the processing, could you try disabling and deleting the metadata table by setting hoodie.metadata.enable=false in Hudi configs? This automatically deletes the metadata table after a few commits.

yihua avatar Sep 26 '22 19:09 yihua

@yihua Yes that helped. However I can assume that the same can be done with hudi-cli as I wrote before. medatada delete and metadata create

@nsivabalan Yes we are using amazon bundle for Spark 3.2.1 which is provided by EMR 6.7

kasured avatar Sep 27 '22 02:09 kasured

yes, you are right. you can disable via hudi-cli as well.

nsivabalan avatar Sep 30 '22 03:09 nsivabalan

Since we could not reproduce w/ OSS spark, can you reach out to aws support. CC @umehrot2 @rahil-c : Have you folks seen this issue before. seems like simple read from metadata table is failing w/ EMR spark.

nsivabalan avatar Sep 30 '22 03:09 nsivabalan

I saw this issue with Spark on mesos (on EC2), not EMR Spark

dyang108 avatar Sep 30 '22 16:09 dyang108

Hi ,

Is there any resolution for this issue yet or any idea by which release this issue can be fixed ? I am also facing the same issue. My test case is very simple - to reload same file twice

  1. With hoodie.datasource.write.operation = BulkInsert
  2. With hoodie.datasource.write.operation = Upsert

When metadata is enabled Bulk Insert works fine , but Upsert Aborts with "Caused by: java.lang.IllegalStateException: Block has already been inflated" When metedata is disabled ( hoodie.metadata.enable = false ) The Upsert works fine.

My test cases mostly depend on Metadata , so I need it to be enabled. Please let me know if there is any other workaround.

Thank you !

gunjanchaudhary87 avatar Jan 25 '23 09:01 gunjanchaudhary87

cc @nsivabalan to look into this issue, thanks.

danny0405 avatar Jan 26 '23 06:01 danny0405

Is a fix for this issue planned to be regressed into 13.0 or a 12.x patch release?

jschuck9 avatar Feb 07 '23 16:02 jschuck9

I got the same issue. When I excluded the hudi-common from hudi-aws, it worked successfully

wyb199701 avatar Apr 14 '23 08:04 wyb199701

Hit this when using Flink 1.16 and Hudi bdb50ddccc9631317dfb06a06abc38cbd3714ce8 on EKS. Metadata table was ~~disabled~~ enabled.

2023-04-19 23:55:23
org.apache.hudi.exception.HoodieMetadataException: Failed to retrieve files in partition s3a://path-to-data/ from metadata
    at org.apache.hudi.metadata.BaseTableMetadata.getAllFilesInPartition(BaseTableMetadata.java:152)
    at org.apache.hudi.metadata.HoodieMetadataFileSystemView.listPartition(HoodieMetadataFileSystemView.java:69)
    at org.apache.hudi.common.table.view.AbstractTableFileSystemView.lambda$ensurePartitionLoadedCorrectly$16(AbstractTableFileSystemView.java:428)
    at java.base/java.util.concurrent.ConcurrentHashMap.computeIfAbsent(Unknown Source)
    at org.apache.hudi.common.table.view.AbstractTableFileSystemView.ensurePartitionLoadedCorrectly(AbstractTableFileSystemView.java:419)
    at org.apache.hudi.common.table.view.AbstractTableFileSystemView.getLatestMergedFileSlicesBeforeOrOn(AbstractTableFileSystemView.java:854)
    at org.apache.hudi.common.table.view.PriorityBasedFileSystemView.execute(PriorityBasedFileSystemView.java:104)
    at org.apache.hudi.common.table.view.PriorityBasedFileSystemView.getLatestMergedFileSlicesBeforeOrOn(PriorityBasedFileSystemView.java:195)
    at org.apache.hudi.sink.partitioner.profile.DeltaWriteProfile.smallFilesProfile(DeltaWriteProfile.java:62)
    at org.apache.hudi.sink.partitioner.profile.WriteProfile.getSmallFiles(WriteProfile.java:191)
    at org.apache.hudi.sink.partitioner.BucketAssigner.getSmallFileAssign(BucketAssigner.java:179)
    at org.apache.hudi.sink.partitioner.BucketAssigner.addInsert(BucketAssigner.java:137)
    at org.apache.hudi.sink.partitioner.BucketAssignFunction.getNewRecordLocation(BucketAssignFunction.java:215)
    at org.apache.hudi.sink.partitioner.BucketAssignFunction.processRecord(BucketAssignFunction.java:200)
    at org.apache.hudi.sink.partitioner.BucketAssignFunction.processElement(BucketAssignFunction.java:162)
    at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
    at [org.apache.flink.streaming.runtime.io](http://org.apache.flink.streaming.runtime.io/).AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
    at [org.apache.flink.streaming.runtime.io](http://org.apache.flink.streaming.runtime.io/).AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
    at [org.apache.flink.streaming.runtime.io](http://org.apache.flink.streaming.runtime.io/).StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
    at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
    at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.apache.hudi.exception.HoodieException: Exception when reading log file
    at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternalV1(AbstractHoodieLogRecordReader.java:375)
    at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternal(AbstractHoodieLogRecordReader.java:222)
    at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.performScan(HoodieMergedLogRecordScanner.java:199)
    at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:115)
    at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:74)
    at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner$Builder.build(HoodieMergedLogRecordScanner.java:465)
    at org.apache.hudi.metadata.HoodieMetadataLogRecordReader$Builder.build(HoodieMetadataLogRecordReader.java:218)
    at org.apache.hudi.metadata.HoodieBackedTableMetadata.getLogRecordScanner(HoodieBackedTableMetadata.java:539)
    at org.apache.hudi.metadata.HoodieBackedTableMetadata.openReaders(HoodieBackedTableMetadata.java:440)
    at org.apache.hudi.metadata.HoodieBackedTableMetadata.getOrCreateReaders(HoodieBackedTableMetadata.java:425)
    at org.apache.hudi.metadata.HoodieBackedTableMetadata.lambda$getRecordsByKeys$3(HoodieBackedTableMetadata.java:239)
    at java.base/java.util.HashMap.forEach(Unknown Source)
    at org.apache.hudi.metadata.HoodieBackedTableMetadata.getRecordsByKeys(HoodieBackedTableMetadata.java:237)
    at org.apache.hudi.metadata.HoodieBackedTableMetadata.getRecordByKey(HoodieBackedTableMetadata.java:152)
    at org.apache.hudi.metadata.BaseTableMetadata.fetchAllFilesInPartition(BaseTableMetadata.java:339)
    at org.apache.hudi.metadata.BaseTableMetadata.getAllFilesInPartition(BaseTableMetadata.java:150)
    ... 28 more
Caused by: java.lang.IllegalStateException: Block has already been inflated
    at org.apache.hudi.common.util.ValidationUtils.checkState(ValidationUtils.java:76)
    at org.apache.hudi.common.table.log.block.HoodieLogBlock.inflate(HoodieLogBlock.java:276)
    at org.apache.hudi.common.table.log.block.HoodieLogBlock.inflate(HoodieLogBlock.java:287)
    at org.apache.hudi.common.table.log.block.HoodieDataBlock.readRecordsFromBlockPayload(HoodieDataBlock.java:166)
    at org.apache.hudi.common.table.log.block.HoodieDataBlock.getRecordIterator(HoodieDataBlock.java:128)
    at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.getRecordsIterator(AbstractHoodieLogRecordReader.java:807)
    at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.processDataBlock(AbstractHoodieLogRecordReader.java:630)
    at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.processQueuedBlocksForInstant(AbstractHoodieLogRecordReader.java:674)
    at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternalV1(AbstractHoodieLogRecordReader.java:366

xccui avatar Apr 20 '23 04:04 xccui

I don't know the detailed logic here, but apparently recursively invoking inflate() when hitting an IOException will cause the state check to fail.

image

xccui avatar Apr 20 '23 04:04 xccui

@kasured to unblock the processing, could you try disabling and deleting the metadata table by setting hoodie.metadata.enable=false in Hudi configs? This automatically deletes the metadata table after a few commits.

right, this seems obviously flawed, it is hiding the actual IO Exception, instead throwing an irrevelant block inflated.

zinking avatar Jun 16 '23 02:06 zinking

@zinking Can you fire a fix for it.

danny0405 avatar Jun 16 '23 02:06 danny0405

Hey folks,

This issue: https://gist.github.com/envomp/268bdd35a3b3399db59583c0e159c229#file-cover-logs Seems to be a cover-up to real underlying issue which in our case was: https://gist.github.com/envomp/268bdd35a3b3399db59583c0e159c229#file-actual-logs

Which in turn was caused by TIMELINE_SERVER_BASED marker types being unable when using spark structured streaming. Workaround was to disable metadata table.

envomp avatar Oct 09 '23 08:10 envomp

@envomp Are you setting fs.s3a.connection.maximum to a higher value. That might fix the Connection timeout issue.

ad1happy2go avatar Oct 09 '23 09:10 ad1happy2go

Hey @ad1happy2go

We have the following s3a configurations:

spark.hadoop.fs.s3a.path.style.access: true
spark.hadoop.fs.s3a.threads.max: 64
spark.hadoop.fs.s3a.connection.maximum: 1024
spark.hadoop.fs.s3a.maxRetries: 64

Also tried setting fs.s3a.connection.maximum to 8096 but the issue persisted.

EDIT:

For a table with smaller volume this is how disabling metadata table affected the app duraton time: Screenshot 2023-10-09 at 14 34 42

envomp avatar Oct 09 '23 11:10 envomp

same problem, version 0.14.1, on hdfs .

chestnutqiang avatar Feb 06 '24 06:02 chestnutqiang