confluent-kafka-python icon indicating copy to clipboard operation
confluent-kafka-python copied to clipboard

manual checkpoint

Open sivankumar86 opened this issue 2 years ago • 0 comments

Description

I am using 2 process to read from kafka and write to destination (design of ELT tool) hence, I would like to maintain checkpoint/offset manually and read it from where it left on last run . however, it is reading old record i.e last checkpoint record.

How to reproduce

def reset_offset(self, consumer, partitions): """ set the topic starting point """ for p in partitions: name = str(p.partition) if name in self.state_update: p.offset = self.state_update[name] ## from checkpoint lookup else: p.offset = OFFSET_BEGINNING consumer.assign(partitions)

self.consumer.subscribe([self.topic], on_assign=self.reset_offset)

It always return last read record on next run. I would like to skip and read only next record from last checkpoint.

sivankumar86 avatar Jul 28 '23 09:07 sivankumar86