pykafka icon indicating copy to clipboard operation
pykafka copied to clipboard

After old messages are removed from the server, consuming from offset 0 fails

Open colindickson opened this issue 12 years ago • 0 comments

To reproduce:

  1. Set the log.retention.hours = 1 in config/server.properties on a test kafka server
  2. Add a batch of messages using the Producer into a new topic
  3. Consume the batch of messages from step 2, and print out the value of consumer.offset
  4. Wait an hour for those messages to be deleted. Make some tea, and/or browse reddit while you wait.
  5. If you add another batch of messages now into the same topic and attempt to consume the messages starting at offset 0, the consumer.consume() loop will not return anything. Only if you instantiate the consumer using the offset that you took from the end of step 3 will you be able to retrieve the new messages.

But what if you don't know that offset? It probably is not easy to keep track of that offset in a production environment where older messages are deleted every hour.

Otherwise, there's this ugly O(N) hack solution to finding the offset where the messages begin, in the case where we don't know where on the disk the messages begin.

offset = -1
found = False
while not found:
    offset += 1
    consumer = kafka.consumer.Consumer(topic, offset=offset)
    for message in consumer.consume():
        if len(str(message)) > 0:
            print offset
            found = True
        break

(again, this code is awful and not recommended to use, but just something i wrote to get my answer for myself. would be a disaster to run on an empty topic, or even on a topic where the offset is a huge number.)

Can you think of a prettier solution to put into consume.py which could allow us to consume all the messages in a topic, even after some have been deleted by the server?

colindickson avatar Jan 23 '13 12:01 colindickson