pinot
pinot copied to clipboard
consumption halted on realtime table when accessing an offset that has been already deleted from Kafka
the realtime consumption of records on realtime tables stops when given realtime table need to access an offset that has been already wiped by kafka, this can happen because the retention policy in the kafka side is lesser than how often we commit segments in pinot, however when this happens, there's no easy way for us to recover and start consumption again on the realtime table, the log message in the pinot-servers shows the following:
Fetch position xxxx FetchPosition xxx is out of range for partition resetting offset xxxx
consumer plugin: org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory
logs:
Consumed 0 events from (rate:0.0/s), currentOffset=5011308264, numRowsConsumedSoFar=0, numRowsIndexedSoFar=0
[Consumer clientId=consumer-null-12, groupId=null] Seeking to offset 5011308265 for partition xx-xx-xx-xx-3
Consumed 0 events from (rate:0.0/s), currentOffset=5110008164, numRowsConsumedSoFar=0, numRowsIndexedSoFar=0
[Consumer clientId=consumer-null-1, groupId=null] Seeking to offset 5110008165 for partition xx-xx-xx-xx-1
[Consumer clientId=consumer-null-12, groupId=null] Fetch position FetchPosition{offset=5011308265, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[xxx (id: 0 rack: us], epoch=6}} is out of range for partition xx-xx-xx-xx-3, resetting offset
[Consumer clientId=consumer-null-31, groupId=null] Fetch position FetchPosition{offset=4849504882, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[xxx (id: 2 rack: us)], epoch=5}} is out of range for partition xx-xx-xx-xx-5, resetting offset
[Consumer clientId=consumer-null-12, groupId=null] Resetting offset for partition xx-xx-xx-xx-3 to position FetchPosition{offset=546724258, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[xxx (id: 0 rack: us)], epoch=6}}.
[Consumer clientId=consumer-null-31, groupId=null] Resetting offset for partition xx-xx-xx-xx-5 to position FetchPosition{offset=530075133, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[xxx (id: 2 rack: us)], epoch=5}}.
Consumed 0 events from (rate:0.0/s), currentOffset=5063423275, numRowsConsumedSoFar=0, numRowsIndexedSoFar=0
There should be a way to recover from this scenario without having to recreate the table or having to manually update the zk options for the consuming segment which is hard to find in the zk side.
@npawar
@lfernandez93 which version of Pinot does this affect? It’s possible that it was fixed by #7927.
Pinot recovers automatically from this scenario. You can run the RealitmeSegmentValidationManager manually if urgent fix is needed. Alternatively, you can configure to run this periodically (say, every 30m) to fix tables that have gone beyond retention period of the stream.
@mcvsubbu validation manager doesn't fix this. I see validation manager checking for this only in the scenario of if (isAllInstancesInState(instanceStateMap, SegmentStateModel.OFFLINE)) {
@richardstartin I think this is a separate issue. There's no filtering happening, but Pinot is just seeing it as no data coming in, because the consumer is actually unable to get any data. But might be worth trying out with the fix, I havent verified for sure.
@richardstartin we have been running latest in our dev environment
@npawar the consumer on the server side should experience a (permanent) exception, which will cause the segment to go OFFLINE, and then the validator will fix it automatically.
At least, this is what it is supposed to do. If not, we have a bug, yes. I am eager to know what the fix is
more. complete logs:
Consumed 0 events from (rate:0.0/s), currentOffset=5011308264, numRowsConsumedSoFar=0, numRowsIndexedSoFar=0
[Consumer clientId=consumer-null-12, groupId=null] Seeking to offset 5011308265 for partition xx-xx-xx-xx-3
Consumed 0 events from (rate:0.0/s), currentOffset=5110008164, numRowsConsumedSoFar=0, numRowsIndexedSoFar=0
[Consumer clientId=consumer-null-1, groupId=null] Seeking to offset 5110008165 for partition xx-xx-xx-xx-1
[Consumer clientId=consumer-null-12, groupId=null] Fetch position FetchPosition{offset=5011308265, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[xxx (id: 0 rack: us], epoch=6}} is out of range for partition xx-xx-xx-xx-3, resetting offset
[Consumer clientId=consumer-null-31, groupId=null] Fetch position FetchPosition{offset=4849504882, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[xxx (id: 2 rack: us)], epoch=5}} is out of range for partition xx-xx-xx-xx-5, resetting offset
[Consumer clientId=consumer-null-12, groupId=null] Resetting offset for partition xx-xx-xx-xx-3 to position FetchPosition{offset=546724258, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[xxx (id: 0 rack: us)], epoch=6}}.
[Consumer clientId=consumer-null-31, groupId=null] Resetting offset for partition xx-xx-xx-xx-5 to position FetchPosition{offset=530075133, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[xxx (id: 2 rack: us)], epoch=5}}.
Consumed 0 events from (rate:0.0/s), currentOffset=5063423275, numRowsConsumedSoFar=0, numRowsIndexedSoFar=0
and this just keeps on happening on loop
Thanks @lfernandez93 for the additional logs. As discussed, this looks like Kafka side is claiming to reset offset to what's available, but after that either the message batch doesn't contain any new records, or Pinot side doesn't accept the new records. Former seems more likely. Will investigate. @mcvsubbu there's no exceptions coming in from the kafka consumer side for this, so what you're theorizing will not happen.
@npawar you're saying kafka consumer tries to fetch messages and it's unsuccessful, but the consumer doesn't throw exception?
Yes, after resetting offset to latest, the Kafka consumer should be returning the messages from new offset. So either it's not, or Pinot consumer is rejecting those messages. Hence responded on the slack thread that I'll investigate. This should be easy to reproduce. Start a consumer and manually change the start offset to something in the past.
noting per @mcvsubbu we are currently using org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory
@npawar you would do that by zk yes?
@lfernandez93 yes, i was able to reproduce. I've submitted a fix
@lfernandez93 @npawar can this be closed ?