hudi icon indicating copy to clipboard operation
hudi copied to clipboard

Hoodie Deltastreamer Job unable to fecth data from kafka topic from starting offset available

Open ksrihari93 opened this issue 2 years ago • 5 comments

Describe the problem you faced

I'm using Hudi Delta streamer in continuous mode with Kafka source.

Whenever Kafka offset got expired the job will fail with offset out of range exception.

In order to fix this we re reun the job with below configuration

auto.offset.reset=earliest

Still job failed with same offset out of range exception.

Still it’s not picking up from the first offset available in kafka topic.

After passing this parameter

  • "--checkpoint"
  • "1654478958926"

The job started running fine but picked up the messages from the latest offset.

Thus we have encountered Data loss. Unable to recover the Job from starting offset available in Kafka topic partitions.

To Reproduce

Steps to reproduce the behavior:

1.Run a Delstastreamer Job with Kafka Source after consuming few records from kafka topic stop the job 2.Add few more messages to kafka topic and reduces the retention period in kafka topic so that offset that is committed in kafka topic needs to be deleted 3.Re run the Delta streamer job now ,it will fail with Offset out of range exception

Expected behavior

Even if the offset has been expired the Deltastreamer should pick the messages from starting offset available in kafka topic partition

A clear and concise description of what you expected to happen.

Environment Description

Hudi version :0.9

Spark version :2.4.4

Storage (HDFS/S3/GCS..) :BLOB

Running on Docker? (yes/no) :Kubernetes

Additional context

Add any other context about the problem here.

Stacktrace

	at org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:121)
	at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:43)
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
	at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
	at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:349)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.hudi.exception.HoodieException: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: operation has failed
	at org.apache.hudi.execution.SparkLazyInsertIterable.computeNext(SparkLazyInsertIterable.java:80)
	at org.apache.hudi.execution.SparkLazyInsertIterable.computeNext(SparkLazyInsertIterable.java:36)
	at org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:119)
	... 23 more
Caused by: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: operation has failed
	at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:143)
	at org.apache.hudi.execution.SparkLazyInsertIterable.computeNext(SparkLazyInsertIterable.java:76)
	... 25 more
Caused by: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: operation has failed
	at java.util.concurrent.FutureTask.report(FutureTask.java:122)
	at java.util.concurrent.FutureTask.get(FutureTask.java:192)
	at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:141)
	... 26 more
Caused by: org.apache.hudi.exception.HoodieException: operation has failed
	at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.throwExceptionIfFailed(BoundedInMemoryQueue.java:247)
	at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.readNextRecord(BoundedInMemoryQueue.java:226)
	at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.access$100(BoundedInMemoryQueue.java:52)
	at org.apache.hudi.common.util.queue.BoundedInMemoryQueue$QueueIterator.hasNext(BoundedInMemoryQueue.java:277)
	at org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:36)
	at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:121)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	... 3 more
Caused by: org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {topi:partition
=22900845678}
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:970)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:490)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1259)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1187)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
	at org.apache.spark.streaming.kafka010.InternalKafkaConsumer.poll(KafkaDataConsumer.scala:200)
	at org.apache.spark.streaming.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:129)
	at org.apache.spark.streaming.kafka010.KafkaDataConsumer$class.get(KafkaDataConsumer.scala:36)
	at org.apache.spark.streaming.kafka010.KafkaDataConsumer$NonCachedKafkaDataConsumer.get(KafkaDataConsumer.scala:218)
	at org.apache.spark.streaming.kafka010.KafkaRDDIterator.next(KafkaRDD.scala:261)
	at org.apache.spark.streaming.kafka010.KafkaRDDIterator.next(KafkaRDD.scala:229)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
	at scala.collection.Iterator$$anon$12.next(Iterator.scala:445)
	at scala.collection.convert.Wrappers$IteratorWrapper.next(Wrappers.scala:31)
	at org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:46)
	at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:92)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	... 4 more

Driver stacktrace:
22/06/09 01:05:56 INFO DAGScheduler: Job 6 failed: sum at DeltaSync.java:455, took 4.353569 s
22/06/09 01:05:56 ERROR HoodieDeltaStreamer: Shutting down delta-sync due to exception
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 7.0 failed 4 times, most recent failure: Lost task 0.3 in stage 7.0 (TID 749, 10.80.69.117, executor 106): java.lang.RuntimeException: org.apache.hudi.exception.HoodieException: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: operation has failed
	at org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:121)
	at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:43)
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
	at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
	at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:349)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.hudi.exception.HoodieException: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: operation has failed
	at org.apache.hudi.execution.SparkLazyInsertIterable.computeNext(SparkLazyInsertIterable.java:80)
	at org.apache.hudi.execution.SparkLazyInsertIterable.computeNext(SparkLazyInsertIterable.java:36)
	at org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:119)
	... 23 more
Caused by: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: operation has failed
	at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:143)
	at org.apache.hudi.execution.SparkLazyInsertIterable.computeNext(SparkLazyInsertIterable.java:76)
	... 25 more
Caused by: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: operation has failed
	at java.util.concurrent.FutureTask.report(FutureTask.java:122)
	at java.util.concurrent.FutureTask.get(FutureTask.java:192)
	at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:141)
	... 26 more
Caused by: org.apache.hudi.exception.HoodieException: operation has failed
	at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.throwExceptionIfFailed(BoundedInMemoryQueue.java:247)
	at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.readNextRecord(BoundedInMemoryQueue.java:226)
	at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.access$100(BoundedInMemoryQueue.java:52)
	at org.apache.hudi.common.util.queue.BoundedInMemoryQueue$QueueIterator.hasNext(BoundedInMemoryQueue.java:277)
	at org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:36)
	at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:121)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	... 3 more
Caused by: org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {topi:partition
=22900845678}
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:970)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:490)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1259)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1187)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
	at org.apache.spark.streaming.kafka010.InternalKafkaConsumer.poll(KafkaDataConsumer.scala:200)
	at org.apache.spark.streaming.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:129)
	at org.apache.spark.streaming.kafka010.KafkaDataConsumer$class.get(KafkaDataConsumer.scala:36)
	at org.apache.spark.streaming.kafka010.KafkaDataConsumer$NonCachedKafkaDataConsumer.get(KafkaDataConsumer.scala:218)
	at org.apache.spark.streaming.kafka010.KafkaRDDIterator.next(KafkaRDD.scala:261)
	at org.apache.spark.streaming.kafka010.KafkaRDDIterator.next(KafkaRDD.scala:229)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
	at scala.collection.Iterator$$anon$12.next(Iterator.scala:445)
	at scala.collection.convert.Wrappers$IteratorWrapper.next(Wrappers.scala:31)
	at org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:46)
	at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:92)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	... 4 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2158)
	at org.apache.spark.rdd.RDD$$anonfun$fold$1.apply(RDD.scala:1098)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.fold(RDD.scala:1092)
	at org.apache.spark.rdd.DoubleRDDFunctions$$anonfun$sum$1.apply$mcD$sp(DoubleRDDFunctions.scala:35)
	at org.apache.spark.rdd.DoubleRDDFunctions$$anonfun$sum$1.apply(DoubleRDDFunctions.scala:35)
	at org.apache.spark.rdd.DoubleRDDFunctions$$anonfun$sum$1.apply(DoubleRDDFunctions.scala:35)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.DoubleRDDFunctions.sum(DoubleRDDFunctions.scala:34)
	at org.apache.spark.api.java.JavaDoubleRDD.sum(JavaDoubleRDD.scala:165)
	at org.apache.hudi.utilities.deltastreamer.DeltaSync.writeToSink(DeltaSync.java:455)
	at org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:287)
	at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.lambda$startService$0(HoodieDeltaStreamer.java:598)
	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: org.apache.hudi.exception.HoodieException: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: operation has failed
	at org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:121)
	at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:43)
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
	at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
	at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:349)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	... 3 more
Caused by: org.apache.hudi.exception.HoodieException: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: operation has failed
	at org.apache.hudi.execution.SparkLazyInsertIterable.computeNext(SparkLazyInsertIterable.java:80)
	at org.apache.hudi.execution.SparkLazyInsertIterable.computeNext(SparkLazyInsertIterable.java:36)
	at org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:119)
	... 23 more
Caused by: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: operation has failed
	at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:143)
	at org.apache.hudi.execution.SparkLazyInsertIterable.computeNext(SparkLazyInsertIterable.java:76)
	... 25 more
Caused by: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: operation has failed
	at java.util.concurrent.FutureTask.report(FutureTask.java:122)
	at java.util.concurrent.FutureTask.get(FutureTask.java:192)
	at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:141)
	... 26 more
Caused by: org.apache.hudi.exception.HoodieException: operation has failed
	at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.throwExceptionIfFailed(BoundedInMemoryQueue.java:247)
	at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.readNextRecord(BoundedInMemoryQueue.java:226)
	at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.access$100(BoundedInMemoryQueue.java:52)
	at org.apache.hudi.common.util.queue.BoundedInMemoryQueue$QueueIterator.hasNext(BoundedInMemoryQueue.java:277)
	at org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:36)
	at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:121)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	... 3 more
Caused by: org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {topi:partition
=22900845678}
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:970)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:490)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1259)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1187)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
	at org.apache.spark.streaming.kafka010.InternalKafkaConsumer.poll(KafkaDataConsumer.scala:200)
	at org.apache.spark.streaming.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:129)
	at org.apache.spark.streaming.kafka010.KafkaDataConsumer$class.get(KafkaDataConsumer.scala:36)
	at org.apache.spark.streaming.kafka010.KafkaDataConsumer$NonCachedKafkaDataConsumer.get(KafkaDataConsumer.scala:218)
	at org.apache.spark.streaming.kafka010.KafkaRDDIterator.next(KafkaRDD.scala:261)
	at org.apache.spark.streaming.kafka010.KafkaRDDIterator.next(KafkaRDD.scala:229)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
	at scala.collection.Iterator$$anon$12.next(Iterator.scala:445)
	at scala.collection.convert.Wrappers$IteratorWrapper.next(Wrappers.scala:31)
	at org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:46)
	at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:92)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	... 4 more
22/06/09 01:05:56 INFO HoodieDeltaStreamer: Delta Sync shutdown. Error ?true
22/06/09 01:05:56 ERROR HoodieAsyncService: Service shutdown with error
java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: Job aborted due to stage failure: Task 0 in stage 7.0 failed 4 times, most recent failure: Lost task 0.3 in stage 7.0 (TID 749, 10.80.69.117, executor 106): java.lang.RuntimeException: org.apache.hudi.exception.HoodieException: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: operation has failed
	at org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:121)
	at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:43)
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
	at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
	at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:349)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.hudi.exception.HoodieException: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: operation has failed
	at org.apache.hudi.execution.SparkLazyInsertIterable.computeNext(SparkLazyInsertIterable.java:80)
	at org.apache.hudi.execution.SparkLazyInsertIterable.computeNext(SparkLazyInsertIterable.java:36)
	at org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:119)
	... 23 more
Caused by: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: operation has failed
	at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:143)
	at org.apache.hudi.execution.SparkLazyInsertIterable.computeNext(SparkLazyInsertIterable.java:76)
	... 25 more
Caused by: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: operation has failed
	at java.util.concurrent.FutureTask.report(FutureTask.java:122)
	at java.util.concurrent.FutureTask.get(FutureTask.java:192)
	at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:141)
	... 26 more
Caused by: org.apache.hudi.exception.HoodieException: operation has failed
	at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.throwExceptionIfFailed(BoundedInMemoryQueue.java:247)
	at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.readNextRecord(BoundedInMemoryQueue.java:226)
	at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.access$100(BoundedInMemoryQueue.java:52)
	at org.apache.hudi.common.util.queue.BoundedInMemoryQueue$QueueIterator.hasNext(BoundedInMemoryQueue.java:277)
	at org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:36)
	at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:121)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	... 3 more
Caused by: org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {topi:partition
=22900845678}
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:970)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:490)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1259)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1187)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
	at org.apache.spark.streaming.kafka010.InternalKafkaConsumer.poll(KafkaDataConsumer.scala:200)
	at org.apache.spark.streaming.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:129)
	at org.apache.spark.streaming.kafka010.KafkaDataConsumer$class.get(KafkaDataConsumer.scala:36)
	at org.apache.spark.streaming.kafka010.KafkaDataConsumer$NonCachedKafkaDataConsumer.get(KafkaDataConsumer.scala:218)
	at org.apache.spark.streaming.kafka010.KafkaRDDIterator.next(KafkaRDD.scala:261)
	at org.apache.spark.streaming.kafka010.KafkaRDDIterator.next(KafkaRDD.scala:229)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
	at scala.collection.Iterator$$anon$12.next(Iterator.scala:445)
	at scala.collection.convert.Wrappers$IteratorWrapper.next(Wrappers.scala:31)
	at org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:46)
	at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:92)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	... 4 more

Driver stacktrace:
	at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
	at org.apache.hudi.async.HoodieAsyncService.waitForShutdown(HoodieAsyncService.java:79)
	at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.lambda$sync$1(HoodieDeltaStreamer.java:159)
	at org.apache.hudi.common.util.Option.ifPresent(Option.java:96)
	at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:156)
	at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:480)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.apache.hudi.exception.HoodieException: Job aborted due to stage failure: Task 0 in stage 7.0 failed 4 times, most recent failure: Lost task 0.3 in stage 7.0 (TID 749, 10.80.69.117, executor 106): java.lang.RuntimeException: org.apache.hudi.exception.HoodieException: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: operation has failed
	at org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:121)
	at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:43)
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
	at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
	at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:349)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.hudi.exception.HoodieException: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: operation has failed
	at org.apache.hudi.execution.SparkLazyInsertIterable.computeNext(SparkLazyInsertIterable.java:80)
	at org.apache.hudi.execution.SparkLazyInsertIterable.computeNext(SparkLazyInsertIterable.java:36)
	at org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:119)
	... 23 more
Caused by: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: operation has failed
	at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:143)
	at org.apache.hudi.execution.SparkLazyInsertIterable.computeNext(SparkLazyInsertIterable.java:76)
	... 25 more
Caused by: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: operation has failed
	at java.util.concurrent.FutureTask.report(FutureTask.java:122)
	at java.util.concurrent.FutureTask.get(FutureTask.java:192)
	at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:141)
	... 26 more
Caused by: org.apache.hudi.exception.HoodieException: operation has failed
	at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.throwExceptionIfFailed(BoundedInMemoryQueue.java:247)
	at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.readNextRecord(BoundedInMemoryQueue.java:226)
	at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.access$100(BoundedInMemoryQueue.java:52)
	at org.apache.hudi.common.util.queue.BoundedInMemoryQueue$QueueIterator.hasNext(BoundedInMemoryQueue.java:277)
	at org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:36)
	at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:121)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	... 3 more
Caused by: org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {topi:partition
=22900845678}
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:970)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:490)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1259)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1187)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
	at org.apache.spark.streaming.kafka010.InternalKafkaConsumer.poll(KafkaDataConsumer.scala:200)
	at org.apache.spark.streaming.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:129)
	at org.apache.spark.streaming.kafka010.KafkaDataConsumer$class.get(KafkaDataConsumer.scala:36)
	at org.apache.spark.streaming.kafka010.KafkaDataConsumer$NonCachedKafkaDataConsumer.get(KafkaDataConsumer.scala:218)
	at org.apache.spark.streaming.kafka010.KafkaRDDIterator.next(KafkaRDD.scala:261)
	at org.apache.spark.streaming.kafka010.KafkaRDDIterator.next(KafkaRDD.scala:229)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
	at scala.collection.Iterator$$anon$12.next(Iterator.scala:445)
	at scala.collection.convert.Wrappers$IteratorWrapper.next(Wrappers.scala:31)
	at org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:46)
	at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:92)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	... 4 more



ksrihari93 avatar Jun 09 '22 17:06 ksrihari93

can you post your spark-submit command and the contents of properties file used w/ deltastreamer.

nsivabalan avatar Jun 13 '22 15:06 nsivabalan

I am assuming you are setting auto.offset.reset=earliest in your property file.

nsivabalan avatar Jun 13 '22 15:06 nsivabalan

Hi @nsivabalan ,

#base properties

hoodie.insert.shuffle.parallelism=50 hoodie.bulkinsert.shuffle.parallelism=200 hoodie.embed.timeline.server=true hoodie.filesystem.view.type=EMBEDDED_KV_STORE hoodie.compact.inline=false hoodie.bulkinsert.sort.mode=none

#cleaner properties hoodie.cleaner.policy=KEEP_LATEST_FILE_VERSIONS hoodie.cleaner.fileversions.retained=60 hoodie.clean.async=true

#archival hoodie.keep.min.commits=12 hoodie.keep.max.commits=15

#datasource properties hoodie.deltastreamer.schemaprovider.registry.url= hoodie.datasource.write.recordkey.field= hoodie.deltastreamer.source.kafka.topic= hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.CustomKeyGenerator hoodie.datasource.write.partitionpath.field=timestamp:TIMESTAMP hoodie.deltastreamer.kafka.source.maxEvents=600000000 hoodie.deltastreamer.keygen.timebased.timestamp.type=EPOCHMILLISECONDS hoodie.deltastreamer.keygen.timebased.input.timezone=UTC hoodie.deltastreamer.keygen.timebased.output.timezone=UTC hoodie.deltastreamer.keygen.timebased.output.dateformat='dt='yyyy-MM-dd hoodie.clustering.async.enabled=true hoodie.clustering.plan.strategy.target.file.max.bytes=3000000000 hoodie.clustering.plan.strategy.small.file.limit=200000001 hoodie.clustering.async.max.commits=1 hoodie.clustering.plan.strategy.max.num.groups=10 oodie.clustering.plan.strategy.max.bytes.per.group=9000000000

#kafka props bootstrap.servers= group.id=hudi-lpe auto.offset.reset=(As is said above when i passed earliest it got failed) ,so no other choice to recover i have passed latest hoodie.deltastreamer.source.kafka.checkpoint.type=timestamp

ksrihari93 avatar Jun 14 '22 16:06 ksrihari93

So, you are using timestamp based kafka offset checkpoints. looks like either the last committed checkpoint will be used or earliest available checkpoint will be used. auto reset to latest may not work in this code path. Can you clarify if thats what you are looking for.

also, can you post the stacktrace you see when you set "auto.offset.reset=earliest". you had mentioned it failed right, but haven't posted the full stacktrace for this.

nsivabalan avatar Aug 10 '22 02:08 nsivabalan

auto offset reset would work as expected for regular checkpoint types (string). for timestamp based offsets, may be needs some fixes.

nsivabalan avatar Aug 10 '22 02:08 nsivabalan

@ksrihari93 : hope you got a chance to checkout my previous responses. When kafka checkpoint type is timestamp, we don't support resetting to LATEST. does that help?

I see you have mentioned that when you set to EARLIEST, you are running into issues. do you have driver logs for the same w/ exact stacktrace.

nsivabalan avatar Aug 16 '22 07:08 nsivabalan

@ksrihari93 : gentle ping.

nsivabalan avatar Sep 10 '22 17:09 nsivabalan

We have recovered the job by skipping a few offsets. this issue can be closed now.

ksrihari93 avatar Sep 12 '22 11:09 ksrihari93

thanks!

nsivabalan avatar Sep 12 '22 22:09 nsivabalan