Title: confluent_kafka.Consumer.consume Method Takes Excessive Time to Respond
Description
I am writing a Flask API to retrieve data from Apache Kafka topics. The consumer subscribes to a different topic each time according to the API path and uses the confluent_kafka.Consumer.consume method to fetch a given number of events each time. Here is the consumer configuration I am using:
consumerConfig = {
'bootstrap.servers': "localhost:9092,0.0.0.0:9092",
'client.id': socket.gethostname(),
'group.id': "group",
'auto.offset.reset': "latest",
'enable.auto.commit': False,
'fetch.max.bytes': 52428800, # 50MB
'fetch.min.bytes': 1,
'max.poll.interval.ms': 300000,
'session.timeout.ms': 300000,
}
The code to consume events from the given topic is as follows:
def consumeEvent(topic,consumer, messages_number=1, time_out=60):
consumer.subscribe(topic, on_assign=assignment_callback)
try:
messages= consumer.consume(num_messages=messages_number,timeout=time_out)
if(len(messages)==0):
return{
"status":"fail",
"messsage":"topic empty or timeout exeeded"
}
elif(len(messages)<messages_number):
print(messages)
return{
"status":"fail",
"message":"topic does not contain this number of messages or timeout exeeded"
}
else:
return{
"status":"success",
"message":"request completed succesfully",
"data":messages_parser(messages)
}
except KeyboardInterrupt:
print('Canceled by user.')
the assignment_callback and the messages_parser are other methods that i built .
The problem is that it takes a long time (1-2 minutes) to return the answer. Is this normal behavior, or is there something wrong with the configuration or usage?
additional infomration: I am using kafka as a docker container for an experimental environment
How to reproduce
Checklist
Please provide the following information:
-
[x]
confluent_kafka.version()= ('2.3.0', 33751040)confluent_kafka.libversion()= ('2.3.0', 33751295) -
[x] Apache Kafka broker version: 3.7.0
-
[x] Client configuration: given in the description
-
[x] Operating system: ubuntu 22.04 but im using kakfa in a docker image
-
[x] Provide client logs (with
'debug': '..'as necessary): nothing abnormal -
[x] Provide broker log excerpts :same