confluent-kafka-python icon indicating copy to clipboard operation
confluent-kafka-python copied to clipboard

Consumer.consume commits offsets when 'enable.auto.commit': False

Open whobscr opened this issue 3 years ago • 9 comments
trafficstars

Description

When I disable auto commit with 'enable.auto.commit': False , and call poll(), the consumer doesn't commit and it's working fine, like it's said in the docs. But on consume() it commits offsets after every call. This is quite unexpected. Consumer.close() is not called during execution.

How to reproduce

   consumer = Consumer({'bootstrap.servers': 'server:9092',
         'group.id': 'group_id',
         'enable.auto.commit': False,
         'auto.offset.reset': 'earliest'})
    print(str(consumer.get_watermark_offsets(TopicPartition('topic_test', 0))))

   try:
        consumer.subscribe([config.KAFKA_INPUT_TOPIC])

        while True:
            messages = consumer.consume(num_messages=config.KAFKA_MAX_BATCH_SIZE, timeout=10.0)
            if messages:
                for msg in messages:
                    if msg.error():
                        if msg.error().code() == KafkaError._PARTITION_EOF:
                            # End of partition event
                            logger.error('%% %s [%d] reached end at offset %d\n' %
                                         (msg.topic(), msg.partition(), msg.offset()))
                        else:
                            raise KafkaException(msg.error())
                    else:
                        print('message ready')

             print(str(consumer.get_watermark_offsets(TopicPartition('topic_test', 0))))
            #consumer.commit(asynchronous=False)
    finally:
        # Close down consumer to commit final offsets.
        consumer.close()

Execution result: first call to get_watermark_offsets is different from all subsequent calls. I expect it to be the same. I need to get a batch of messages to process them concurrently, but seems like this batch consumpsion is committing the offsets before the processing which may result in loss of data.

Checklist

Please provide the following information:

  • [ ] confluent-kafka-python and librdkafka version (confluent_kafka.version() and confluent_kafka.libversion()): 1.8.2
  • [ ] Apache Kafka broker version: 2.11
  • [ ] Operating system: CentOS Linux release 7.9.2009

whobscr avatar Mar 13 '22 20:03 whobscr

@whobscr , thanks for asking.

first call to get_watermark_offsets is different from all subsequent calls. I expect it to be the same. I think you're trying to verify if the offset is committed or not? If my understanding is correct, the right method to verify is https://github.com/confluentinc/confluent-kafka-python/blob/master/src/confluent_kafka/src/Consumer.c#L650.

I verified it on my local, if we don't commit manually with consumer.commit(asynchronous=False), we got the same result using consumer.commit

committed:  [TopicPartition{topic=testbatch1,partition=0,offset=109,error=None}]

committed:  [TopicPartition{topic=testbatch1,partition=0,offset=109,error=None}]

committed:  [TopicPartition{topic=testbatch1,partition=0,offset=109,error=None}]

committed:  [TopicPartition{topic=testbatch1,partition=0,offset=109,error=None}]

If we run consumer.commit(asynchronous=False), the result is different. (The batch size for my local is 10)

committed:  [TopicPartition{topic=testbatch1,partition=0,offset=179,error=None}]

committed:  [TopicPartition{topic=testbatch1,partition=0,offset=189,error=None}]

committed:  [TopicPartition{topic=testbatch1,partition=0,offset=199,error=None}]

committed:  [TopicPartition{topic=testbatch1,partition=0,offset=209,error=None}]

jliunyu avatar Mar 16 '22 07:03 jliunyu

According to the configuration doc: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md, for enable.auto.commit, Note: setting this to false does not prevent the consumer from fetching previously committed start offsets. To circumvent this behaviour set specific start offsets per partition in the call to assign().

jliunyu avatar Mar 16 '22 07:03 jliunyu

You also mentioned that Consumer.close() is not called during execution, this method is only needed when close down the KafkaConsumer. On your code, there is an infinity while loop, so it never reach to the close method. I think you can add some condition that can end the while loop, then the close will be called.

jliunyu avatar Mar 16 '22 07:03 jliunyu

Yes, I used the committed() method, but it seems like it returned only client-local offsets. But then I used the Kafka bash scripts to look at the consumer offsets and they are increasing with calling the consume()

whobscr avatar Mar 16 '22 09:03 whobscr

I managed to get the correct offsets in the client side only with the get_watermark_offsets(). The committed() method even returns None when it's called before any operations. I think it because it doesn't query the Kafka about the offset.

whobscr avatar Mar 16 '22 09:03 whobscr

Saying that the close() method is not called during execution I meant that I don't call it, so it can't commit the offsets. So, I made the conclusion that the consume() method commits the offsets. And I checked the offsets with the Kafka bash script, so it is not dependent on my code.

whobscr avatar Mar 16 '22 09:03 whobscr

Yes, I used the committed() method, but it seems like it returned only client-local offsets. But then I used the Kafka bash scripts to look at the consumer offsets and they are increasing with calling the consume()

Can you please provide some information on how did your verify with the bash script and the result? It will be easier for me to take a look.

I managed to get the correct offsets in the client side only with the get_watermark_offsets(). The committed() method even returns None when it's called before any operations. I think it because it doesn't query the Kafka about the offset.

get_watermark_offsets() reads the cached HWM (from the last fetch response).

This is one example for rd_kafka_committed: https://github.com/edenhill/librdkafka/blob/master/tests/0081-admin.c#L2487, call the rd_kafka_commit first, Commit offsets on broker for the provided list of partitions. (https://github.com/edenhill/librdkafka/blob/master/src/rdkafka.h#L4118), then call rd_kafka_committed to verify the committed offsets

jliunyu avatar Mar 16 '22 18:03 jliunyu

how did your verify with the bash script and the result?

kafka-consumer-groups --bootstrap-server localhost:9092 --group groupname --describe

whobscr avatar Mar 18 '22 07:03 whobscr

call the rd_kafka_commit first

But I want to have uncommitted records, I need to commit them only after processing. How can i disable commits after the consume() call? When I call the poll() method, it works fine, and it doesn't commit anything. But then when I change it to consume() it starts committing.

whobscr avatar Mar 18 '22 07:03 whobscr

Facing the same issue. right now implementing my own wrapper on poll to implement consumer.consume functionality

hansgaurav avatar Oct 19 '22 09:10 hansgaurav