Kafka ingest task with the "OffsetOutOfRangeException with message" would still succeeds.
Kafka ingestion task has the following warning:
2023-05-25T20:47:11,184 WARN [task-runner-0-priority-0] org.apache.druid.indexing.kafka.IncrementalPublishingKafkaIndexTaskRunner - OffsetOutOfRangeException with message [Fetch position FetchPosition{offset=4791341818, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[...
This seems to signify the Kafka client can't locate data in the Kafka server side. The data is lost in the Kafka server either due to out of retention or server operation issues. This error is not recoverable from the Kafka ingestion task perspective.
We should instead fail the task or at least give some signal in the metrics?
Facing a similar issue, any fix on this?
WARN [task-runner-0-priority-0] org.apache.druid.indexing.kafka.IncrementalPublishingKafkaIndexTaskRunner - OffsetOutOfRangeException with message [Fetch position FetchPosition{offset=3192, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[REVDAU-LTP031:9092 (id: 0 rack: null)], epoch=0}} is out of range for partition snmp_interface-0]
@Vivek-Kalola @kaisun2000 Any solution on above queries, facing the same issue.
After a hard reset of the supervisor, we encountered a similar issue. I checked Kafka and noticed that the minimum offset in Kafka was larger than the offset indicated in the error. I manually set the offset in Druid, which resolved the problem. It seems Kafka may have removed logs while Druid was still reading them due to our retention settings in Kafka. I hope this helps someone else.
This issue has been marked as stale due to 280 days of inactivity. It will be closed in 4 weeks if no further activity occurs. If this issue is still relevant, please simply write any comment. Even if closed, you can still revive the issue at any time or discuss it on the [email protected] list. Thank you for your contributions.
This issue has been closed due to lack of activity. If you think that is incorrect, or the issue requires additional review, you can revive the issue at any time.
Not sure if this is still an issue, does anyone know?
I did see this problem on one of our Druid 27 cluster, but didn't dive into it.
2025-06-13T06:28:52,089 WARN [task-runner-0-priority-0] org.apache.druid.indexing.kafka.IncrementalPublishingKafkaIndexTaskRunner - OffsetOutOfRangeException with message [Fetch position FetchPosition{offset=11018217440, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[10.169.27.25:9093 (id: 1002 rack: ap-sg-1-general-b)], epoch=84}} is out of range for partition de._txn-1]
2025-06-13T06:28:52,089 WARN [task-runner-0-priority-0] org.apache.druid.indexing.kafka.IncrementalPublishingKafkaIndexTaskRunner - Retrying in 30000ms
2025-06-13T06:29:22,093 WARN [task-runner-0-priority-0] org.apache.druid.indexing.kafka.IncrementalPublishingKafkaIndexTaskRunner - OffsetOutOfRangeException with message [Fetch position FetchPosition{offset=11004815888, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[10.169.27.24:9093 (id: 1001 rack: ap-sg-1-general-b)], epoch=89}} is out of range for partition de._txn-0]
2025-06-13T06:29:22,093 WARN [task-runner-0-priority-0] org.apache.druid.indexing.kafka.IncrementalPublishingKafkaIndexTaskRunner - Retrying in 30000ms
2025-06-13T06:29:52,094 WARN [task-runner-0-priority-0] org.apache.druid.indexing.kafka.IncrementalPublishingKafkaIndexTaskRunner - OffsetOutOfRangeException with message [Fetch position FetchPosition{offset=11018217440, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[10.169.27.25:9093 (id: 1002 rack: ap-sg-1-general-b)], epoch=84}} is out of range for partition de._txn-1]
2025-06-13T06:29:52,094 WARN [task-runner-0-priority-0] org.apache.druid.indexing.kafka.IncrementalPublishingKafkaIndexTaskRunner - Retrying in 30000ms
2025-06-13T06:30:22,100 WARN [task-runner-0-priority-0] org.apache.druid.indexing.kafka.IncrementalPublishingKafkaIndexTaskRunner - OffsetOutOfRangeException with message [Fetch position FetchPosition{offset=11004815888, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[10.169.27.24:9093 (id: 1001 rack: ap-sg-1-general-b)], epoch=89}} is out of range for partition de._txn-0]
2025-06-13T06:30:22,100 WARN [task-runner-0-priority-0] org.apache.druid.indexing.kafka.IncrementalPublishingKafkaIndexTaskRunner - Retrying in 30000ms
2025-06-13T06:30:52,101 WARN [task-runner-0-priority-0] org.apache.druid.indexing.kafka.IncrementalPublishingKafkaIndexTaskRunner - OffsetOutOfRangeException with message [Fetch position FetchPosition{offset=11018217440, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[10.169.27.25:9093 (id: 1002 rack: ap-sg-1-general-b)], epoch=84}} is out of range for partition de._txn-1]
2025-06-13T06:30:52,101 WARN [task-runner-0-priority-0] org.apache.druid.indexing.kafka.IncrementalPublishingKafkaIndexTaskRunner - Retrying in 30000ms
reproduced in one cluster. above messages were printed periodically.
look at the exception handler of OffsetOutOfRangeException, I can't remember why it waits to retry if auto-reset is not enabled. under such case, the retry seems useless, there's no way to recover from it automatically.
private void possiblyResetOffsetsOrWait(
Map<TopicPartition, Long> outOfRangePartitions,
RecordSupplier<KafkaTopicPartition, Long, KafkaRecordEntity> recordSupplier,
TaskToolbox taskToolbox
) throws InterruptedException, IOException
{
final String stream = task.getIOConfig().getStartSequenceNumbers().getStream();
final boolean isMultiTopic = task.getIOConfig().isMultiTopic();
final Map<TopicPartition, Long> resetPartitions = new HashMap<>();
boolean doReset = false;
if (task.getTuningConfig().isResetOffsetAutomatically()) {
for (Map.Entry<TopicPartition, Long> outOfRangePartition : outOfRangePartitions.entrySet()) {
final TopicPartition topicPartition = outOfRangePartition.getKey();
final long nextOffset = outOfRangePartition.getValue();
// seek to the beginning to get the least available offset
StreamPartition<KafkaTopicPartition> streamPartition = StreamPartition.of(
stream,
new KafkaTopicPartition(isMultiTopic, topicPartition.topic(), topicPartition.partition())
);
final Long leastAvailableOffset = recordSupplier.getEarliestSequenceNumber(streamPartition);
if (leastAvailableOffset == null) {
throw new ISE(
"got null sequence number for partition[%s] when fetching from kafka!",
topicPartition.partition()
);
}
// reset the seek
recordSupplier.seek(streamPartition, nextOffset);
// Reset consumer offset if resetOffsetAutomatically is set to true
// and the current message offset in the kafka partition is more than the
// next message offset that we are trying to fetch
if (leastAvailableOffset > nextOffset) {
doReset = true;
resetPartitions.put(topicPartition, nextOffset);
}
}
}
if (doReset) {
sendResetRequestAndWait(CollectionUtils.mapKeys(resetPartitions, topicPartition -> StreamPartition.of(
stream,
new KafkaTopicPartition(isMultiTopic, topicPartition.topic(), topicPartition.partition())
)), taskToolbox);
} else {
log.warn("Retrying in %dms", task.getPollRetryMs());
pollRetryLock.lockInterruptibly();
try {
long nanos = TimeUnit.MILLISECONDS.toNanos(task.getPollRetryMs());
while (nanos > 0L && !pauseRequested && !stopRequested.get()) {
nanos = isAwaitingRetry.awaitNanos(nanos);
}
}
finally {
pollRetryLock.unlock();
}
}
}
Like others, we encountered this issue on a cluster running Druid 32.0.1. After performing a manual hard reset of the supervisor manually, we saw this issue surface eventually for a few partitions. We have auto-reset disabled resetOffsetAutomatically: false and useEarliestOffset: true.
The requested offset is less than the earliest available offset, which makes this an irrecoverable state. However, the task simply logs a warning, retries, and cycles through without consuming any messages. We only caught the issue due to a noticeable lag build-up.
Looks like the code assumes that this is a legitimate situation in all cases and returns an empty set of records. This may be the case for when auto-reset is enabled or when a topic partition isn't available yet, but there are other scenarios, like the one above, where the requested offset is no longer available and valid - https://github.com/apache/druid/blob/master/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskRunner.java#L99
org.apache.druid.indexing.kafka.IncrementalPublishingKafkaIndexTaskRunner - OffsetOutOfRangeException with message [Fetch position FetchPosition{offset=2380443635, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[broker-config:9093 (id: 0 rack: us-west-2a)], epoch=44}} is out of range for partition xyz-14]
look at the exception handler of OffsetOutOfRangeException, I can't remember why it waits to retry if auto-reset is not enabled. under such case, the retry seems useless, there's no way to recover from it automatically.
@FrankChen021, agreed. For the case when resetOffsetAutomatically: false, we should just fail the task with a useful message so operators can take a corrective action to reset the offsets for the problematic partitions.
the comment was there when the code was first committed.
//
// Handles OffsetOutOfRangeException, which is thrown if the seeked-to
// offset is not present in the topic-partition. This can happen if we're asking a task to read from data
// that has not been written yet (which is totally legitimate). So let's wait for it to show up
//
"if we're asking a task to read from data that has not been written yet" --> I don't think we have such scenario, because we don't provide an API to user to reset offset to a position where it has not been written.
To fail the task when OffsetOutOfRange is raised is easy, but the tricky part is how we handle the exception.
there're several alternatives for the exception handling.
| No. | Exception Handling | Status of Task | Cons |
|---|---|---|---|
| 1 | Just re-throw the exception | FAIL | under the current exception handling of task, task does not publish any consumed data, this means all data consumed on partitions where exception is raised on will be lost. |
| 2 | Handling the OffsetOutOfRangeException and publish data | FAIL | it's a little bit tricky to achieve it |
| 3 | Handling the exception and publish data, using log.alert to send an alert | SUCCESS | a little bit easier than 2, but hard to track which task encounters this exception |
@gianm @abhishekrb19 what do u think?