druid icon indicating copy to clipboard operation
druid copied to clipboard

Kafka ingest task with the "OffsetOutOfRangeException with message" would still succeeds.

Open kaisun2000 opened this issue 2 years ago • 9 comments

Kafka ingestion task has the following warning:

2023-05-25T20:47:11,184 WARN [task-runner-0-priority-0] org.apache.druid.indexing.kafka.IncrementalPublishingKafkaIndexTaskRunner - OffsetOutOfRangeException with message [Fetch position FetchPosition{offset=4791341818, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[...

This seems to signify the Kafka client can't locate data in the Kafka server side. The data is lost in the Kafka server either due to out of retention or server operation issues. This error is not recoverable from the Kafka ingestion task perspective.

We should instead fail the task or at least give some signal in the metrics?

kaisun2000 avatar May 25 '23 22:05 kaisun2000

Facing a similar issue, any fix on this? WARN [task-runner-0-priority-0] org.apache.druid.indexing.kafka.IncrementalPublishingKafkaIndexTaskRunner - OffsetOutOfRangeException with message [Fetch position FetchPosition{offset=3192, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[REVDAU-LTP031:9092 (id: 0 rack: null)], epoch=0}} is out of range for partition snmp_interface-0]

Vivek-Kalola avatar Dec 19 '23 04:12 Vivek-Kalola

@Vivek-Kalola @kaisun2000 Any solution on above queries, facing the same issue.

sc-sityad avatar Apr 18 '24 09:04 sc-sityad

After a hard reset of the supervisor, we encountered a similar issue. I checked Kafka and noticed that the minimum offset in Kafka was larger than the offset indicated in the error. I manually set the offset in Druid, which resolved the problem. It seems Kafka may have removed logs while Druid was still reading them due to our retention settings in Kafka. I hope this helps someone else.

iercan avatar Jul 16 '24 13:07 iercan

This issue has been marked as stale due to 280 days of inactivity. It will be closed in 4 weeks if no further activity occurs. If this issue is still relevant, please simply write any comment. Even if closed, you can still revive the issue at any time or discuss it on the [email protected] list. Thank you for your contributions.

github-actions[bot] avatar Apr 23 '25 00:04 github-actions[bot]

This issue has been closed due to lack of activity. If you think that is incorrect, or the issue requires additional review, you can revive the issue at any time.

github-actions[bot] avatar May 21 '25 00:05 github-actions[bot]

Not sure if this is still an issue, does anyone know?

gianm avatar May 30 '25 22:05 gianm

I did see this problem on one of our Druid 27 cluster, but didn't dive into it.

FrankChen021 avatar May 31 '25 03:05 FrankChen021

2025-06-13T06:28:52,089 WARN [task-runner-0-priority-0] org.apache.druid.indexing.kafka.IncrementalPublishingKafkaIndexTaskRunner - OffsetOutOfRangeException with message [Fetch position FetchPosition{offset=11018217440, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[10.169.27.25:9093 (id: 1002 rack: ap-sg-1-general-b)], epoch=84}} is out of range for partition de._txn-1]
2025-06-13T06:28:52,089 WARN [task-runner-0-priority-0] org.apache.druid.indexing.kafka.IncrementalPublishingKafkaIndexTaskRunner - Retrying in 30000ms
2025-06-13T06:29:22,093 WARN [task-runner-0-priority-0] org.apache.druid.indexing.kafka.IncrementalPublishingKafkaIndexTaskRunner - OffsetOutOfRangeException with message [Fetch position FetchPosition{offset=11004815888, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[10.169.27.24:9093 (id: 1001 rack: ap-sg-1-general-b)], epoch=89}} is out of range for partition de._txn-0]
2025-06-13T06:29:22,093 WARN [task-runner-0-priority-0] org.apache.druid.indexing.kafka.IncrementalPublishingKafkaIndexTaskRunner - Retrying in 30000ms
2025-06-13T06:29:52,094 WARN [task-runner-0-priority-0] org.apache.druid.indexing.kafka.IncrementalPublishingKafkaIndexTaskRunner - OffsetOutOfRangeException with message [Fetch position FetchPosition{offset=11018217440, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[10.169.27.25:9093 (id: 1002 rack: ap-sg-1-general-b)], epoch=84}} is out of range for partition de._txn-1]
2025-06-13T06:29:52,094 WARN [task-runner-0-priority-0] org.apache.druid.indexing.kafka.IncrementalPublishingKafkaIndexTaskRunner - Retrying in 30000ms
2025-06-13T06:30:22,100 WARN [task-runner-0-priority-0] org.apache.druid.indexing.kafka.IncrementalPublishingKafkaIndexTaskRunner - OffsetOutOfRangeException with message [Fetch position FetchPosition{offset=11004815888, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[10.169.27.24:9093 (id: 1001 rack: ap-sg-1-general-b)], epoch=89}} is out of range for partition de._txn-0]
2025-06-13T06:30:22,100 WARN [task-runner-0-priority-0] org.apache.druid.indexing.kafka.IncrementalPublishingKafkaIndexTaskRunner - Retrying in 30000ms
2025-06-13T06:30:52,101 WARN [task-runner-0-priority-0] org.apache.druid.indexing.kafka.IncrementalPublishingKafkaIndexTaskRunner - OffsetOutOfRangeException with message [Fetch position FetchPosition{offset=11018217440, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[10.169.27.25:9093 (id: 1002 rack: ap-sg-1-general-b)], epoch=84}} is out of range for partition de._txn-1]
2025-06-13T06:30:52,101 WARN [task-runner-0-priority-0] org.apache.druid.indexing.kafka.IncrementalPublishingKafkaIndexTaskRunner - Retrying in 30000ms

reproduced in one cluster. above messages were printed periodically.

FrankChen021 avatar Jun 13 '25 07:06 FrankChen021

look at the exception handler of OffsetOutOfRangeException, I can't remember why it waits to retry if auto-reset is not enabled. under such case, the retry seems useless, there's no way to recover from it automatically.

private void possiblyResetOffsetsOrWait(
      Map<TopicPartition, Long> outOfRangePartitions,
      RecordSupplier<KafkaTopicPartition, Long, KafkaRecordEntity> recordSupplier,
      TaskToolbox taskToolbox
  ) throws InterruptedException, IOException
  {
    final String stream = task.getIOConfig().getStartSequenceNumbers().getStream();
    final boolean isMultiTopic = task.getIOConfig().isMultiTopic();
    final Map<TopicPartition, Long> resetPartitions = new HashMap<>();
    boolean doReset = false;
    if (task.getTuningConfig().isResetOffsetAutomatically()) {
      for (Map.Entry<TopicPartition, Long> outOfRangePartition : outOfRangePartitions.entrySet()) {
        final TopicPartition topicPartition = outOfRangePartition.getKey();
        final long nextOffset = outOfRangePartition.getValue();
        // seek to the beginning to get the least available offset
        StreamPartition<KafkaTopicPartition> streamPartition = StreamPartition.of(
            stream,
            new KafkaTopicPartition(isMultiTopic, topicPartition.topic(), topicPartition.partition())
        );
        final Long leastAvailableOffset = recordSupplier.getEarliestSequenceNumber(streamPartition);
        if (leastAvailableOffset == null) {
          throw new ISE(
              "got null sequence number for partition[%s] when fetching from kafka!",
              topicPartition.partition()
          );
        }
        // reset the seek
        recordSupplier.seek(streamPartition, nextOffset);
        // Reset consumer offset if resetOffsetAutomatically is set to true
        // and the current message offset in the kafka partition is more than the
        // next message offset that we are trying to fetch
        if (leastAvailableOffset > nextOffset) {
          doReset = true;
          resetPartitions.put(topicPartition, nextOffset);
        }
      }
    }

    if (doReset) {
      sendResetRequestAndWait(CollectionUtils.mapKeys(resetPartitions, topicPartition -> StreamPartition.of(
          stream,
          new KafkaTopicPartition(isMultiTopic, topicPartition.topic(), topicPartition.partition())
      )), taskToolbox);
    } else {
      log.warn("Retrying in %dms", task.getPollRetryMs());
      pollRetryLock.lockInterruptibly();
      try {
        long nanos = TimeUnit.MILLISECONDS.toNanos(task.getPollRetryMs());
        while (nanos > 0L && !pauseRequested && !stopRequested.get()) {
          nanos = isAwaitingRetry.awaitNanos(nanos);
        }
      }
      finally {
        pollRetryLock.unlock();
      }
    }
  }

FrankChen021 avatar Jun 13 '25 08:06 FrankChen021

Like others, we encountered this issue on a cluster running Druid 32.0.1. After performing a manual hard reset of the supervisor manually, we saw this issue surface eventually for a few partitions. We have auto-reset disabled resetOffsetAutomatically: false and useEarliestOffset: true.

The requested offset is less than the earliest available offset, which makes this an irrecoverable state. However, the task simply logs a warning, retries, and cycles through without consuming any messages. We only caught the issue due to a noticeable lag build-up.

Looks like the code assumes that this is a legitimate situation in all cases and returns an empty set of records. This may be the case for when auto-reset is enabled or when a topic partition isn't available yet, but there are other scenarios, like the one above, where the requested offset is no longer available and valid - https://github.com/apache/druid/blob/master/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskRunner.java#L99

org.apache.druid.indexing.kafka.IncrementalPublishingKafkaIndexTaskRunner - OffsetOutOfRangeException with message [Fetch position FetchPosition{offset=2380443635, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[broker-config:9093 (id: 0 rack: us-west-2a)], epoch=44}} is out of range for partition xyz-14]

look at the exception handler of OffsetOutOfRangeException, I can't remember why it waits to retry if auto-reset is not enabled. under such case, the retry seems useless, there's no way to recover from it automatically.

@FrankChen021, agreed. For the case when resetOffsetAutomatically: false, we should just fail the task with a useful message so operators can take a corrective action to reset the offsets for the problematic partitions.

abhishekrb19 avatar Jul 07 '25 16:07 abhishekrb19

the comment was there when the code was first committed.

      //
      // Handles OffsetOutOfRangeException, which is thrown if the seeked-to
      // offset is not present in the topic-partition. This can happen if we're asking a task to read from data
      // that has not been written yet (which is totally legitimate). So let's wait for it to show up
      //

"if we're asking a task to read from data that has not been written yet" --> I don't think we have such scenario, because we don't provide an API to user to reset offset to a position where it has not been written.

FrankChen021 avatar Jul 08 '25 03:07 FrankChen021

To fail the task when OffsetOutOfRange is raised is easy, but the tricky part is how we handle the exception.

there're several alternatives for the exception handling.

No. Exception Handling Status of Task Cons
1 Just re-throw the exception FAIL under the current exception handling of task, task does not publish any consumed data, this means all data consumed on partitions where exception is raised on will be lost.
2 Handling the OffsetOutOfRangeException and publish data FAIL it's a little bit tricky to achieve it
3 Handling the exception and publish data, using log.alert to send an alert SUCCESS a little bit easier than 2, but hard to track which task encounters this exception

@gianm @abhishekrb19 what do u think?

FrankChen021 avatar Jul 09 '25 07:07 FrankChen021