aiokafka icon indicating copy to clipboard operation
aiokafka copied to clipboard

consumer_timeout_ms

Open bunengxiu opened this issue 2 years ago • 6 comments

I have a topic named my_topic, I want to exit process that no data is put into the topic within ten seconds when consuming my_topic

using kafka-python package, I can make consumer_timeout_ms = 10000, it works, code show as below:

from kafka import KafkaConsumer
from kafka.errors import KafkaError

consumer = KafkaConsumer(
    'my_topic',
    bootstrap_servers='localhost:9092',
    group_id="my-group",
    auto_offset_reset='earliest',
    # exit process that no data is put into the topic within ten seconds
    consumer_timeout_ms=10000,  # 10s
)


def test():
    for msg in consumer:
        print(msg.value)
    print('consumer timeout, exit process that no data is put into the topic within ten seconds')


if __name__ == '__main__':
    test()

but using aiokafka package, I also make consumer_timeout_ms = 10000, It doesn't work, code show as below:

from aiokafka import AIOKafkaConsumer
import asyncio


async def async_test():
    c = AIOKafkaConsumer(
        'my_topic',
        bootstrap_servers='localhost:9092',
        group_id="my-group",
        auto_offset_reset='earliest',
        # exit process that no data is put into the topic within ten seconds
        consumer_timeout_ms=10000,  # 10秒
    )
    try:
        # exit process that no data is put into the topic within ten seconds
        async for msg in c:           #  process stuck here, After ten seconds, still stuck here, consumer_timeout_ms parameter does not works
            print("consumed: ", msg.topic, msg.partition, msg.offset, msg.key, msg.value, msg.timestamp)
    except KafkaError as e:
        print(e)
        print('kafka error')
    finally:
        print(f'my_topic is empty, time out: 10s, exit process, ')
        await c.stop()


if __name__ == '__main__':
    asyncio.run(async_test())

Which parameter should I assign to achieve my goal tthat exiting process that no data is put into the topic within ten seconds

bunengxiu avatar Jun 01 '23 11:06 bunengxiu

I would use the "timeout_ms" parameter of the getmany method https://aiokafka.readthedocs.io/en/stable/api.html#aiokafka.AIOKafkaConsumer.getmany

It is different from the iterator consumption of your example : instead of iterating over the consumer, you would have to loop around getmany until you get no messages back

vmaurin avatar Jun 01 '23 11:06 vmaurin

~Looks like a bug to me.~

ods avatar Jun 01 '23 13:06 ods

I would use the "timeout_ms" parameter of the getmany method https://aiokafka.readthedocs.io/en/stable/api.html#aiokafka.AIOKafkaConsumer.getmany

It is different from the iterator consumption of your example : instead of iterating over the consumer, you would have to loop around getmany until you get no messages back

The same parameters(consumer_timeout_ms), why the effect is not the same

bunengxiu avatar Jun 02 '23 01:06 bunengxiu

The same parameters(consumer_timeout_ms), why the effect is not the same

I believe Vincent meant to use timeout_ms parameter of getmany, not consumer_timeout_ms .

ods avatar Jun 02 '23 07:06 ods

Vincent

The same parameters(consumer_timeout_ms), why the effect is not the same

I believe Vincent meant to use timeout_ms parameter of getmany, not consumer_timeout_ms .

using timeout_ms parameter of getmany, can achieve my purpose, it works but I think that it seems simpler and easier to assign value to consumer_timerout_ms parameter of AIOKafkaConsumer

bunengxiu avatar Jun 02 '23 08:06 bunengxiu

Despite the fact both parameters have the same name in aiokafka and python-kafka, it doesn't seem they have the same behavior

python kafka

number of milliseconds to block during message iteration before raising StopIteration (i.e., ending the iterator). Default block forever [float(‘inf’)].

aiokafka

maximum wait timeout for background fetching routine. Mostly defines how fast the system will see rebalance and request new data for new partitions. Default: 200

With a quick look, in aiokafka it is used in the fetcher in a way that matches what the documentation is saying. So the closer thing you could have in aiokafka is timeout_ms on getmany, but maybe the consumer parameter could be renamed to avoid a confusion with python-kafka (and eventually have a parameter that is used as default timeout_ms without the need to use getmany)

vmaurin avatar Jun 02 '23 08:06 vmaurin