confluent-kafka-python
confluent-kafka-python copied to clipboard
manual checkpoint
Description
I am using 2 process to read from kafka and write to destination (design of ELT tool) hence, I would like to maintain checkpoint/offset manually and read it from where it left on last run . however, it is reading old record i.e last checkpoint record.
How to reproduce
def reset_offset(self, consumer, partitions): """ set the topic starting point """ for p in partitions: name = str(p.partition) if name in self.state_update: p.offset = self.state_update[name] ## from checkpoint lookup else: p.offset = OFFSET_BEGINNING consumer.assign(partitions)
self.consumer.subscribe([self.topic], on_assign=self.reset_offset)
It always return last read record on next run. I would like to skip and read only next record from last checkpoint.