aiokafka icon indicating copy to clipboard operation
aiokafka copied to clipboard

consumer_timeout_ms

Open bunengxiu opened this issue 1 year ago • 6 comments

I have a topic named my_topic, I want to exit process that no data is put into the topic within ten seconds when consuming my_topic

using kafka-python package, I can make consumer_timeout_ms = 10000, it works, code show as below:

from kafka import KafkaConsumer
from kafka.errors import KafkaError

consumer = KafkaConsumer(
    'my_topic',
    bootstrap_servers='localhost:9092',
    group_id="my-group",
    auto_offset_reset='earliest',
    # exit process that no data is put into the topic within ten seconds
    consumer_timeout_ms=10000,  # 10s
)


def test():
    for msg in consumer:
        print(msg.value)
    print('consumer timeout, exit process that no data is put into the topic within ten seconds')


if __name__ == '__main__':
    test()

but using aiokafka package, I also make consumer_timeout_ms = 10000, It doesn't work, code show as below:

from aiokafka import AIOKafkaConsumer
import asyncio


async def async_test():
    c = AIOKafkaConsumer(
        'my_topic',
        bootstrap_servers='localhost:9092',
        group_id="my-group",
        auto_offset_reset='earliest',
        # exit process that no data is put into the topic within ten seconds
        consumer_timeout_ms=10000,  # 10秒
    )
    try:
        # exit process that no data is put into the topic within ten seconds
        async for msg in c:           #  process stuck here, After ten seconds, still stuck here, consumer_timeout_ms parameter does not works
            print("consumed: ", msg.topic, msg.partition, msg.offset, msg.key, msg.value, msg.timestamp)
    except KafkaError as e:
        print(e)
        print('kafka error')
    finally:
        print(f'my_topic is empty, time out: 10s, exit process, ')
        await c.stop()


if __name__ == '__main__':
    asyncio.run(async_test())

Which parameter should I assign to achieve my goal tthat exiting process that no data is put into the topic within ten seconds

bunengxiu avatar Jun 01 '23 11:06 bunengxiu