consumer_timeout_ms
I have a topic named my_topic, I want to exit process that no data is put into the topic within ten seconds when consuming my_topic
using kafka-python package, I can make consumer_timeout_ms = 10000, it works, code show as below:
from kafka import KafkaConsumer
from kafka.errors import KafkaError
consumer = KafkaConsumer(
'my_topic',
bootstrap_servers='localhost:9092',
group_id="my-group",
auto_offset_reset='earliest',
# exit process that no data is put into the topic within ten seconds
consumer_timeout_ms=10000, # 10s
)
def test():
for msg in consumer:
print(msg.value)
print('consumer timeout, exit process that no data is put into the topic within ten seconds')
if __name__ == '__main__':
test()
but using aiokafka package, I also make consumer_timeout_ms = 10000, It doesn't work, code show as below:
from aiokafka import AIOKafkaConsumer
import asyncio
async def async_test():
c = AIOKafkaConsumer(
'my_topic',
bootstrap_servers='localhost:9092',
group_id="my-group",
auto_offset_reset='earliest',
# exit process that no data is put into the topic within ten seconds
consumer_timeout_ms=10000, # 10秒
)
try:
# exit process that no data is put into the topic within ten seconds
async for msg in c: # process stuck here, After ten seconds, still stuck here, consumer_timeout_ms parameter does not works
print("consumed: ", msg.topic, msg.partition, msg.offset, msg.key, msg.value, msg.timestamp)
except KafkaError as e:
print(e)
print('kafka error')
finally:
print(f'my_topic is empty, time out: 10s, exit process, ')
await c.stop()
if __name__ == '__main__':
asyncio.run(async_test())
Which parameter should I assign to achieve my goal tthat exiting process that no data is put into the topic within ten seconds
I would use the "timeout_ms" parameter of the getmany method https://aiokafka.readthedocs.io/en/stable/api.html#aiokafka.AIOKafkaConsumer.getmany
It is different from the iterator consumption of your example : instead of iterating over the consumer, you would have to loop around getmany until you get no messages back
~Looks like a bug to me.~
I would use the "timeout_ms" parameter of the getmany method https://aiokafka.readthedocs.io/en/stable/api.html#aiokafka.AIOKafkaConsumer.getmany
It is different from the iterator consumption of your example : instead of iterating over the consumer, you would have to loop around getmany until you get no messages back
The same parameters(consumer_timeout_ms), why the effect is not the same
The same parameters(consumer_timeout_ms), why the effect is not the same
I believe Vincent meant to use timeout_ms parameter of getmany, not consumer_timeout_ms .
Vincent
The same parameters(consumer_timeout_ms), why the effect is not the same
I believe Vincent meant to use
timeout_msparameter ofgetmany, notconsumer_timeout_ms.
using timeout_ms parameter of getmany, can achieve my purpose, it works but I think that it seems simpler and easier to assign value to consumer_timerout_ms parameter of AIOKafkaConsumer
Despite the fact both parameters have the same name in aiokafka and python-kafka, it doesn't seem they have the same behavior
python kafka
number of milliseconds to block during message iteration before raising StopIteration (i.e., ending the iterator). Default block forever [float(‘inf’)].
aiokafka
maximum wait timeout for background fetching routine. Mostly defines how fast the system will see rebalance and request new data for new partitions. Default: 200
With a quick look, in aiokafka it is used in the fetcher in a way that matches what the documentation is saying. So the closer thing you could have in aiokafka is timeout_ms on getmany, but maybe the consumer parameter could be renamed to avoid a confusion with python-kafka (and eventually have a parameter that is used as default timeout_ms without the need to use getmany)