confluent-kafka-python icon indicating copy to clipboard operation
confluent-kafka-python copied to clipboard

Title: confluent_kafka.Consumer.consume Method Takes Excessive Time to Respond

Open abdelghanimeliani opened this issue 1 year ago • 0 comments

Description

I am writing a Flask API to retrieve data from Apache Kafka topics. The consumer subscribes to a different topic each time according to the API path and uses the confluent_kafka.Consumer.consume method to fetch a given number of events each time. Here is the consumer configuration I am using:

consumerConfig = {
    'bootstrap.servers': "localhost:9092,0.0.0.0:9092",
    'client.id': socket.gethostname(),
    'group.id': "group",
    'auto.offset.reset': "latest",
    'enable.auto.commit': False,
    'fetch.max.bytes': 52428800,  # 50MB
    'fetch.min.bytes': 1,
    'max.poll.interval.ms': 300000,
    'session.timeout.ms': 300000,
}

The code to consume events from the given topic is as follows:

def consumeEvent(topic,consumer, messages_number=1, time_out=60):
    consumer.subscribe(topic, on_assign=assignment_callback)
    try:
        messages= consumer.consume(num_messages=messages_number,timeout=time_out)
        if(len(messages)==0):
            return{
                "status":"fail",
                "messsage":"topic empty or timeout exeeded"
            }
        elif(len(messages)<messages_number):
            print(messages)
            return{
                "status":"fail",
                "message":"topic does not contain this number of messages or timeout exeeded"
            }
            
        else:
            return{
                "status":"success",
                "message":"request completed succesfully",
                "data":messages_parser(messages)
            }      
    except KeyboardInterrupt:
        print('Canceled by user.')

the assignment_callback and the messages_parser are other methods that i built . The problem is that it takes a long time (1-2 minutes) to return the answer. Is this normal behavior, or is there something wrong with the configuration or usage? additional infomration: I am using kafka as a docker container for an experimental environment

How to reproduce

Checklist

Please provide the following information:

  • [x] confluent_kafka.version()= ('2.3.0', 33751040) confluent_kafka.libversion() = ('2.3.0', 33751295)

  • [x] Apache Kafka broker version: 3.7.0

  • [x] Client configuration: given in the description

  • [x] Operating system: ubuntu 22.04 but im using kakfa in a docker image

  • [x] Provide client logs (with 'debug': '..' as necessary): nothing abnormal

  • [x] Provide broker log excerpts :same

abdelghanimeliani avatar May 23 '24 12:05 abdelghanimeliani