confluent-kafka-python icon indicating copy to clipboard operation
confluent-kafka-python copied to clipboard

Periodically error on committing offset

Open V4kodin opened this issue 1 year ago • 0 comments

Description

I have random problem while committing offset. Here is consumer code

import time
from confluent_kafka import Consumer, KafkaError
from typing import Callable, Any, List
from dags.pkg.proceed_data import proceed_messages
from dags.entities.config_types import KafkaConsumerConfig, BdConfig
import logging
import threading

consumer_running = True

def stop_loop():
    global consumer_running
    consumer_running = False
    print("End consumer on timer") 

def kafka_consumer(kafka_config: dict,
                   kafka_consumer_config: KafkaConsumerConfig,
                   bd_config: BdConfig,
                   logger: logging.Logger,
                   reformat_record: Callable[[dict, Any], dict | None],
                   table_name: str = None,
                   topic: str = None
                   ):
    global consumer_running

    if table_name is not None:
        bd_config['table_name'] = table_name
    if topic is not None:
        kafka_consumer_config['topic'] = topic
    if reformat_record is None:
        raise Exception('reformat_record is None')
    if logger is None:
        raise Exception('logger is None')
    logger.info(f'reformat_record: {reformat_record.__name__}')

    consumer = Consumer(kafka_config)
    consumer.subscribe([kafka_consumer_config['topic']])
    messages_batch = []
    consumer_running = True

    timer = threading.Timer(kafka_consumer_config['consume_time'], stop_loop)
    timer.start()
    try:
        while consumer_running:
            message = consumer.poll(kafka_consumer_config['poll_timeout'])
            if message is None:
                continue
            if message.error():
                if message.error().code() == KafkaError._PARTITION_EOF:
                    logger.info('%% %s [%d] reached end at offset %d\n' %
                                     (message.topic(), message.partition(), message.offset()))
                else:
                    logger.error(f'Error while consuming: {message.error()}')
                    raise message.error()
                continue

            messages_batch.append(message.value())
            if len(messages_batch) >= kafka_consumer_config['max_messages']:
                logger.debug(f'messages batch: {messages_batch}')
                try:
                    proceed_messages(messages=messages_batch, logger=logger, host=bd_config['host'],
                                     port=bd_config['port'], username=bd_config['username'],
                                     password=bd_config['password'], table_name=bd_config['table_name'],
                                     reformat_record=reformat_record)
                    consumer.commit(asynchronous=False)
                    messages_batch.clear()
                except Exception as e:
                    logger.error('Error with proceed_messages func')
                    raise e
        if len(messages_batch) > 0:
            time.sleep(5)
            try:
                proceed_messages(messages=messages_batch, logger=logger, host=bd_config['host'],
                                 port=bd_config['port'], username=bd_config['username'],
                                 password=bd_config['password'], table_name=bd_config['table_name'],
                                 reformat_record=reformat_record)
                consumer.commit(asynchronous=False)
                messages_batch.clear()
            except Exception as e:
                logger.error('Error with proceed_messages func')
                raise e
    except Exception as e:
        logger.error(f'Error in Kafka consumer: {str(e)}')
        raise e
    finally:
        timer.cancel()
        consumer.close()

I recently added an exit from the loop on timeout (stop_loop() function) and I wanted to process all messages that were not processed due to the condition len(messages_batch) >= kafka_consumer_config['max_messages']. So, I copied the block with "proceed" and "commit", placing it after the loop with the condition len(messages_batch) > 0. However, this block sometimes causes an unexpected error KafkaError{code=NO_OFFSET, val=-168, str="Commit failed: Local: No offset stored"}. I have no idea what the problem is because, in theory, if this commit block starts working, there should be unprocessed messages. But sometimes, I get this error and sometimes I don't.

Maybe I do not see something?

I can send any information or log if necessary.

P.S.

confluent_kafka.version - 2.5.0 confluent_kafka.libversion - 2.5.0

client config except secrets:

kafka_config['auto.offset.reset'] = 'earliest'
kafka_config['enable.auto.offset.store'] = True
kafka_config['enable.auto.commit'] = False

Operating system - docker container, image apache/airflow:2.9.2

How to reproduce

I guess run my code, no idea.

Checklist

Please provide the following information:

  • [x] confluent-kafka-python and librdkafka version (confluent_kafka.version() and confluent_kafka.libversion()):
  • [ ] Apache Kafka broker version:
  • [x] Client configuration: {...}
  • [x] Operating system:
  • [x] Provide client logs (with 'debug': '..' as necessary)
  • [ ] Provide broker log excerpts
  • [ ] Critical issue

V4kodin avatar Oct 25 '24 18:10 V4kodin