pinot icon indicating copy to clipboard operation
pinot copied to clipboard

consumption halted on realtime table when accessing an offset that has been already deleted from Kafka

Open lfernandez93 opened this issue 3 years ago • 13 comments

the realtime consumption of records on realtime tables stops when given realtime table need to access an offset that has been already wiped by kafka, this can happen because the retention policy in the kafka side is lesser than how often we commit segments in pinot, however when this happens, there's no easy way for us to recover and start consumption again on the realtime table, the log message in the pinot-servers shows the following:

Fetch position xxxx FetchPosition xxx is out of range for partition resetting offset xxxx

consumer plugin: org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory

logs:

Consumed 0 events from (rate:0.0/s), currentOffset=5011308264, numRowsConsumedSoFar=0, numRowsIndexedSoFar=0
[Consumer clientId=consumer-null-12, groupId=null] Seeking to offset 5011308265 for partition xx-xx-xx-xx-3
Consumed 0 events from (rate:0.0/s), currentOffset=5110008164, numRowsConsumedSoFar=0, numRowsIndexedSoFar=0
[Consumer clientId=consumer-null-1, groupId=null] Seeking to offset 5110008165 for partition xx-xx-xx-xx-1
[Consumer clientId=consumer-null-12, groupId=null] Fetch position FetchPosition{offset=5011308265, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[xxx (id: 0 rack: us], epoch=6}} is out of range for partition xx-xx-xx-xx-3, resetting offset
[Consumer clientId=consumer-null-31, groupId=null] Fetch position FetchPosition{offset=4849504882, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[xxx (id: 2 rack: us)], epoch=5}} is out of range for partition xx-xx-xx-xx-5, resetting offset
[Consumer clientId=consumer-null-12, groupId=null] Resetting offset for partition xx-xx-xx-xx-3 to position FetchPosition{offset=546724258, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[xxx (id: 0 rack: us)], epoch=6}}.
[Consumer clientId=consumer-null-31, groupId=null] Resetting offset for partition xx-xx-xx-xx-5 to position FetchPosition{offset=530075133, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[xxx (id: 2 rack: us)], epoch=5}}.
Consumed 0 events from (rate:0.0/s), currentOffset=5063423275, numRowsConsumedSoFar=0, numRowsIndexedSoFar=0

There should be a way to recover from this scenario without having to recreate the table or having to manually update the zk options for the consuming segment which is hard to find in the zk side.

lfernandez93 avatar Feb 17 '22 19:02 lfernandez93

@npawar

lfernandez93 avatar Feb 17 '22 19:02 lfernandez93

@lfernandez93 which version of Pinot does this affect? It’s possible that it was fixed by #7927.

richardstartin avatar Feb 17 '22 19:02 richardstartin

Pinot recovers automatically from this scenario. You can run the RealitmeSegmentValidationManager manually if urgent fix is needed. Alternatively, you can configure to run this periodically (say, every 30m) to fix tables that have gone beyond retention period of the stream.

mcvsubbu avatar Feb 17 '22 19:02 mcvsubbu

@mcvsubbu validation manager doesn't fix this. I see validation manager checking for this only in the scenario of if (isAllInstancesInState(instanceStateMap, SegmentStateModel.OFFLINE)) { @richardstartin I think this is a separate issue. There's no filtering happening, but Pinot is just seeing it as no data coming in, because the consumer is actually unable to get any data. But might be worth trying out with the fix, I havent verified for sure.

npawar avatar Feb 17 '22 19:02 npawar

@richardstartin we have been running latest in our dev environment

lfernandez93 avatar Feb 17 '22 20:02 lfernandez93

@npawar the consumer on the server side should experience a (permanent) exception, which will cause the segment to go OFFLINE, and then the validator will fix it automatically.

At least, this is what it is supposed to do. If not, we have a bug, yes. I am eager to know what the fix is

mcvsubbu avatar Feb 17 '22 20:02 mcvsubbu

more. complete logs:

Consumed 0 events from (rate:0.0/s), currentOffset=5011308264, numRowsConsumedSoFar=0, numRowsIndexedSoFar=0
[Consumer clientId=consumer-null-12, groupId=null] Seeking to offset 5011308265 for partition xx-xx-xx-xx-3
Consumed 0 events from (rate:0.0/s), currentOffset=5110008164, numRowsConsumedSoFar=0, numRowsIndexedSoFar=0
[Consumer clientId=consumer-null-1, groupId=null] Seeking to offset 5110008165 for partition xx-xx-xx-xx-1
[Consumer clientId=consumer-null-12, groupId=null] Fetch position FetchPosition{offset=5011308265, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[xxx (id: 0 rack: us], epoch=6}} is out of range for partition xx-xx-xx-xx-3, resetting offset
[Consumer clientId=consumer-null-31, groupId=null] Fetch position FetchPosition{offset=4849504882, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[xxx (id: 2 rack: us)], epoch=5}} is out of range for partition xx-xx-xx-xx-5, resetting offset
[Consumer clientId=consumer-null-12, groupId=null] Resetting offset for partition xx-xx-xx-xx-3 to position FetchPosition{offset=546724258, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[xxx (id: 0 rack: us)], epoch=6}}.
[Consumer clientId=consumer-null-31, groupId=null] Resetting offset for partition xx-xx-xx-xx-5 to position FetchPosition{offset=530075133, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[xxx (id: 2 rack: us)], epoch=5}}.
Consumed 0 events from (rate:0.0/s), currentOffset=5063423275, numRowsConsumedSoFar=0, numRowsIndexedSoFar=0

and this just keeps on happening on loop

lfernandez93 avatar Feb 17 '22 20:02 lfernandez93

Thanks @lfernandez93 for the additional logs. As discussed, this looks like Kafka side is claiming to reset offset to what's available, but after that either the message batch doesn't contain any new records, or Pinot side doesn't accept the new records. Former seems more likely. Will investigate. @mcvsubbu there's no exceptions coming in from the kafka consumer side for this, so what you're theorizing will not happen.

npawar avatar Feb 17 '22 20:02 npawar

@npawar you're saying kafka consumer tries to fetch messages and it's unsuccessful, but the consumer doesn't throw exception?

sajjad-moradi avatar Feb 18 '22 18:02 sajjad-moradi

Yes, after resetting offset to latest, the Kafka consumer should be returning the messages from new offset. So either it's not, or Pinot consumer is rejecting those messages. Hence responded on the slack thread that I'll investigate. This should be easy to reproduce. Start a consumer and manually change the start offset to something in the past.

npawar avatar Feb 18 '22 21:02 npawar

noting per @mcvsubbu we are currently using org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory

@npawar you would do that by zk yes?

lfernandez93 avatar Feb 22 '22 18:02 lfernandez93

@lfernandez93 yes, i was able to reproduce. I've submitted a fix

npawar avatar Mar 08 '22 01:03 npawar

@lfernandez93 @npawar can this be closed ?

icefury71 avatar Oct 05 '22 20:10 icefury71