confluent-kafka-python
confluent-kafka-python copied to clipboard
Periodically error on committing offset
Description
I have random problem while committing offset. Here is consumer code
import time
from confluent_kafka import Consumer, KafkaError
from typing import Callable, Any, List
from dags.pkg.proceed_data import proceed_messages
from dags.entities.config_types import KafkaConsumerConfig, BdConfig
import logging
import threading
consumer_running = True
def stop_loop():
global consumer_running
consumer_running = False
print("End consumer on timer")
def kafka_consumer(kafka_config: dict,
kafka_consumer_config: KafkaConsumerConfig,
bd_config: BdConfig,
logger: logging.Logger,
reformat_record: Callable[[dict, Any], dict | None],
table_name: str = None,
topic: str = None
):
global consumer_running
if table_name is not None:
bd_config['table_name'] = table_name
if topic is not None:
kafka_consumer_config['topic'] = topic
if reformat_record is None:
raise Exception('reformat_record is None')
if logger is None:
raise Exception('logger is None')
logger.info(f'reformat_record: {reformat_record.__name__}')
consumer = Consumer(kafka_config)
consumer.subscribe([kafka_consumer_config['topic']])
messages_batch = []
consumer_running = True
timer = threading.Timer(kafka_consumer_config['consume_time'], stop_loop)
timer.start()
try:
while consumer_running:
message = consumer.poll(kafka_consumer_config['poll_timeout'])
if message is None:
continue
if message.error():
if message.error().code() == KafkaError._PARTITION_EOF:
logger.info('%% %s [%d] reached end at offset %d\n' %
(message.topic(), message.partition(), message.offset()))
else:
logger.error(f'Error while consuming: {message.error()}')
raise message.error()
continue
messages_batch.append(message.value())
if len(messages_batch) >= kafka_consumer_config['max_messages']:
logger.debug(f'messages batch: {messages_batch}')
try:
proceed_messages(messages=messages_batch, logger=logger, host=bd_config['host'],
port=bd_config['port'], username=bd_config['username'],
password=bd_config['password'], table_name=bd_config['table_name'],
reformat_record=reformat_record)
consumer.commit(asynchronous=False)
messages_batch.clear()
except Exception as e:
logger.error('Error with proceed_messages func')
raise e
if len(messages_batch) > 0:
time.sleep(5)
try:
proceed_messages(messages=messages_batch, logger=logger, host=bd_config['host'],
port=bd_config['port'], username=bd_config['username'],
password=bd_config['password'], table_name=bd_config['table_name'],
reformat_record=reformat_record)
consumer.commit(asynchronous=False)
messages_batch.clear()
except Exception as e:
logger.error('Error with proceed_messages func')
raise e
except Exception as e:
logger.error(f'Error in Kafka consumer: {str(e)}')
raise e
finally:
timer.cancel()
consumer.close()
I recently added an exit from the loop on timeout (stop_loop() function) and I wanted to process all messages that were not processed due to the condition len(messages_batch) >= kafka_consumer_config['max_messages'].
So, I copied the block with "proceed" and "commit", placing it after the loop with the condition len(messages_batch) > 0. However, this block sometimes causes an unexpected error KafkaError{code=NO_OFFSET, val=-168, str="Commit failed: Local: No offset stored"}.
I have no idea what the problem is because, in theory, if this commit block starts working, there should be unprocessed messages. But sometimes, I get this error and sometimes I don't.
Maybe I do not see something?
I can send any information or log if necessary.
P.S.
confluent_kafka.version - 2.5.0 confluent_kafka.libversion - 2.5.0
client config except secrets:
kafka_config['auto.offset.reset'] = 'earliest'
kafka_config['enable.auto.offset.store'] = True
kafka_config['enable.auto.commit'] = False
Operating system - docker container, image apache/airflow:2.9.2
How to reproduce
I guess run my code, no idea.
Checklist
Please provide the following information:
- [x] confluent-kafka-python and librdkafka version (
confluent_kafka.version()andconfluent_kafka.libversion()): - [ ] Apache Kafka broker version:
- [x] Client configuration:
{...} - [x] Operating system:
- [x] Provide client logs (with
'debug': '..'as necessary) - [ ] Provide broker log excerpts
- [ ] Critical issue