poseidon icon indicating copy to clipboard operation
poseidon copied to clipboard

added a set_offset() method to partition_consumer.rb

Open bkao opened this issue 11 years ago • 4 comments

When Kafka has a corrupt message poseidon gets stuck attempting to read the same offset ad infinitum. We needed to be able to skip ahead (presumably past a corrupt message or disk corruption) so we could implement logic like this:

loop do
  docs = @kafka.fetch
  break if docs.size > 0
  break if @kafka.next_offset >= @kafka.highwater_mark

  # We can get stuck due to a corrupt message in Kafka.  Kafka's            
  # response (possibly poseidon) doesn't distinguish between no data        
  # and bad data.  Either way we simply get back an empty fetch.  To        
  # get around this, we attempt to advance the offset and re-fetch.         
  # If offset is past the latest offset, Kafka will throw an error          
  # and we'll revert the offset.                                            
  @kafka.set_offset(@kafka.next_offset + 1)
end

bkao avatar Oct 09 '14 20:10 bkao

Coverage Status

Coverage decreased (-0.05%) when pulling 60583fbaa5e3829ff0fd2a4c401e9e49446b7802 on ListenFirstMedia:master into 1b04b18edb51bfa255876e57bfbb359729e44125 on bpot:master.

coveralls avatar Oct 09 '14 20:10 coveralls

@bkao it may be the case that the message you are trying to fetch is larger than the max_fetch_size. Do you know of anyway to reproduce the issue?

I'm hesitant to add the #set_offset API because it may be used as a work-around for legitimate issues which need to be fixed, but I will think about it some.

bpot avatar Dec 06 '14 01:12 bpot

Nor am i certain that this is the right way to work around the specific issue that I had. But my specific use case involved corrupted sectors on disk. I needed a way to scan past the corruption. Without it, Kafka just got stuck.

@bkao https://github.com/bkao it may be the case that the message you are trying to fetch is larger than the max_fetch_size. Do you know of anyway to reproduce the issue?

I'm hesitant to add the #set_offset API because it may be used as a work-around for legitimate issues which need to be fixed, but I will think about it some.

— Reply to this email directly or view it on GitHub https://github.com/bpot/poseidon/pull/69#issuecomment-65879751.

bkao avatar Dec 06 '14 03:12 bkao

I also need to be able to set the offset.

Say I fetch messages with offsets 1, 2, and 3. I successfully process 1 and 2 (and I keep track of this fact), but then some error occurs. I need to pick back up at offset 3, but currently that means reestablishing the entire connection, which doesn't seem correct.

Or say I'm reading through the messages and I want to replay the previous 10 messages, again it requires me to reestablish the connection just to change the offset.

MichaelBaker avatar Aug 13 '15 21:08 MichaelBaker