confluent-kafka-python
confluent-kafka-python copied to clipboard
Consumer.consume commits offsets when 'enable.auto.commit': False
Description
When I disable auto commit with 'enable.auto.commit': False , and call poll(), the consumer doesn't commit and it's working fine, like it's said in the docs. But on consume() it commits offsets after every call. This is quite unexpected. Consumer.close() is not called during execution.
How to reproduce
consumer = Consumer({'bootstrap.servers': 'server:9092',
'group.id': 'group_id',
'enable.auto.commit': False,
'auto.offset.reset': 'earliest'})
print(str(consumer.get_watermark_offsets(TopicPartition('topic_test', 0))))
try:
consumer.subscribe([config.KAFKA_INPUT_TOPIC])
while True:
messages = consumer.consume(num_messages=config.KAFKA_MAX_BATCH_SIZE, timeout=10.0)
if messages:
for msg in messages:
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
# End of partition event
logger.error('%% %s [%d] reached end at offset %d\n' %
(msg.topic(), msg.partition(), msg.offset()))
else:
raise KafkaException(msg.error())
else:
print('message ready')
print(str(consumer.get_watermark_offsets(TopicPartition('topic_test', 0))))
#consumer.commit(asynchronous=False)
finally:
# Close down consumer to commit final offsets.
consumer.close()
Execution result: first call to get_watermark_offsets is different from all subsequent calls. I expect it to be the same. I need to get a batch of messages to process them concurrently, but seems like this batch consumpsion is committing the offsets before the processing which may result in loss of data.
Checklist
Please provide the following information:
- [ ] confluent-kafka-python and librdkafka version (
confluent_kafka.version()andconfluent_kafka.libversion()): 1.8.2 - [ ] Apache Kafka broker version: 2.11
- [ ] Operating system: CentOS Linux release 7.9.2009
@whobscr , thanks for asking.
first call to get_watermark_offsets is different from all subsequent calls. I expect it to be the same. I think you're trying to verify if the offset is committed or not? If my understanding is correct, the right method to verify is https://github.com/confluentinc/confluent-kafka-python/blob/master/src/confluent_kafka/src/Consumer.c#L650.
I verified it on my local, if we don't commit manually with consumer.commit(asynchronous=False), we got the same result using consumer.commit
committed: [TopicPartition{topic=testbatch1,partition=0,offset=109,error=None}]
committed: [TopicPartition{topic=testbatch1,partition=0,offset=109,error=None}]
committed: [TopicPartition{topic=testbatch1,partition=0,offset=109,error=None}]
committed: [TopicPartition{topic=testbatch1,partition=0,offset=109,error=None}]
If we run consumer.commit(asynchronous=False), the result is different. (The batch size for my local is 10)
committed: [TopicPartition{topic=testbatch1,partition=0,offset=179,error=None}]
committed: [TopicPartition{topic=testbatch1,partition=0,offset=189,error=None}]
committed: [TopicPartition{topic=testbatch1,partition=0,offset=199,error=None}]
committed: [TopicPartition{topic=testbatch1,partition=0,offset=209,error=None}]
According to the configuration doc: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md, for enable.auto.commit, Note: setting this to false does not prevent the consumer from fetching previously committed start offsets. To circumvent this behaviour set specific start offsets per partition in the call to assign().
You also mentioned that Consumer.close() is not called during execution, this method is only needed when close down the KafkaConsumer. On your code, there is an infinity while loop, so it never reach to the close method. I think you can add some condition that can end the while loop, then the close will be called.
Yes, I used the committed() method, but it seems like it returned only client-local offsets. But then I used the Kafka bash scripts to look at the consumer offsets and they are increasing with calling the consume()
I managed to get the correct offsets in the client side only with the get_watermark_offsets(). The committed() method even returns None when it's called before any operations. I think it because it doesn't query the Kafka about the offset.
Saying that the close() method is not called during execution I meant that I don't call it, so it can't commit the offsets. So, I made the conclusion that the consume() method commits the offsets. And I checked the offsets with the Kafka bash script, so it is not dependent on my code.
Yes, I used the
committed()method, but it seems like it returned only client-local offsets. But then I used the Kafka bash scripts to look at the consumer offsets and they are increasing with calling theconsume()
Can you please provide some information on how did your verify with the bash script and the result? It will be easier for me to take a look.
I managed to get the correct offsets in the client side only with the get_watermark_offsets(). The committed() method even returns None when it's called before any operations. I think it because it doesn't query the Kafka about the offset.
get_watermark_offsets() reads the cached HWM (from the last fetch response).
This is one example for rd_kafka_committed: https://github.com/edenhill/librdkafka/blob/master/tests/0081-admin.c#L2487, call the rd_kafka_commit first, Commit offsets on broker for the provided list of partitions. (https://github.com/edenhill/librdkafka/blob/master/src/rdkafka.h#L4118), then call rd_kafka_committed to verify the committed offsets
how did your verify with the bash script and the result?
kafka-consumer-groups --bootstrap-server localhost:9092 --group groupname --describe
call the rd_kafka_commit first
But I want to have uncommitted records, I need to commit them only after processing. How can i disable commits after the consume() call? When I call the poll() method, it works fine, and it doesn't commit anything. But then when I change it to consume() it starts committing.
Facing the same issue. right now implementing my own wrapper on poll to implement consumer.consume functionality