added a set_offset() method to partition_consumer.rb
When Kafka has a corrupt message poseidon gets stuck attempting to read the same offset ad infinitum. We needed to be able to skip ahead (presumably past a corrupt message or disk corruption) so we could implement logic like this:
loop do
docs = @kafka.fetch
break if docs.size > 0
break if @kafka.next_offset >= @kafka.highwater_mark
# We can get stuck due to a corrupt message in Kafka. Kafka's
# response (possibly poseidon) doesn't distinguish between no data
# and bad data. Either way we simply get back an empty fetch. To
# get around this, we attempt to advance the offset and re-fetch.
# If offset is past the latest offset, Kafka will throw an error
# and we'll revert the offset.
@kafka.set_offset(@kafka.next_offset + 1)
end
Coverage decreased (-0.05%) when pulling 60583fbaa5e3829ff0fd2a4c401e9e49446b7802 on ListenFirstMedia:master into 1b04b18edb51bfa255876e57bfbb359729e44125 on bpot:master.
@bkao it may be the case that the message you are trying to fetch is larger than the max_fetch_size. Do you know of anyway to reproduce the issue?
I'm hesitant to add the #set_offset API because it may be used as a work-around for legitimate issues which need to be fixed, but I will think about it some.
Nor am i certain that this is the right way to work around the specific issue that I had. But my specific use case involved corrupted sectors on disk. I needed a way to scan past the corruption. Without it, Kafka just got stuck.
@bkao https://github.com/bkao it may be the case that the message you are trying to fetch is larger than the max_fetch_size. Do you know of anyway to reproduce the issue?
I'm hesitant to add the #set_offset API because it may be used as a work-around for legitimate issues which need to be fixed, but I will think about it some.
— Reply to this email directly or view it on GitHub https://github.com/bpot/poseidon/pull/69#issuecomment-65879751.
I also need to be able to set the offset.
Say I fetch messages with offsets 1, 2, and 3. I successfully process 1 and 2 (and I keep track of this fact), but then some error occurs. I need to pick back up at offset 3, but currently that means reestablishing the entire connection, which doesn't seem correct.
Or say I'm reading through the messages and I want to replay the previous 10 messages, again it requires me to reestablish the connection just to change the offset.