aiokafka icon indicating copy to clipboard operation
aiokafka copied to clipboard

consumer_timeout ignored?

Open ZmeiGorynych opened this issue 8 years ago • 7 comments

Hi,

when I run aiokafka consumer with consumer_timeout=100, it should terminate 100ms after reaching the end of the topic; instead (as demonstrated in the attached notebook) it hangs on, presumably waiting for the next record to show up, until terminated from keyboard.

Is it a bug or am I doing something wrong?

Thanks a lot! consumer_timeout ignored.zip

ZmeiGorynych avatar Mar 06 '17 10:03 ZmeiGorynych

Hmm, this seems like a misunderstanding on the initial port from kafka-python (I do presume the described behaviour comes from that). This option is not part of Java's API, so we probably just ended up with a parameter with the same name. Our docs don't state any behaviour like you described:

consumer_timeout_ms (int) – maximum wait timeout for background fetching routine. 
Mostly defines how fast the system will see rebalance and request new data for new 
partitions. Default: 200

As for your case, something like this should work:

src = AIOKafkaConsumer('my_topic',loop=loop, bootstrap_servers=kafka_host,
            group_id=None, consumer_timeout_ms=100, 
            auto_offset_reset='earliest')

loop.run_until_complete(src.start()) 

async def consume_task():
    try:
        async for msg in src:
            msgprint(msg)
    except KafkaError as err:
        print("error while consuming message: ", err)

async def consumer_task_timeout():
    try:
        await asyncio.wait_for(consume_task(), timeout=1.5)
    except asyncio.TimeoutError:
        print("Task timed out")

loop.run_until_complete(consumer_task_timeout())

P.S. And please add explicit code blocks so other people will also understand what you're asking (without installing ipython).

tvoinarovskyi avatar Mar 07 '17 12:03 tvoinarovskyi

Thanks a lot, that clarifies it!

ZmeiGorynych avatar Mar 07 '17 12:03 ZmeiGorynych

What I really wanted was to read Kafka topics of unknown but finite length. So as long as new records are being pulled, I'm happy to wait indefinitely, but when no more new records are forthcoming, I want to stop; so a fixed-length timeout on the couroutine won't help here.

This could have been achieved by a timeout on an individual record poll (as in the kafka-python parameter that started this thread), but as apparently there is no such thing in aiokafka, what I now do instead is to compare current record offset to high watermark on that partition, using the highwater() function.

ZmeiGorynych avatar Mar 13 '17 15:03 ZmeiGorynych

@ZmeiGorynych I think your approach is the best way to do it. It will be dictated by the real highwater mark of the broker, not anything that was defined in client code.

tvoinarovskyi avatar Mar 15 '17 09:03 tvoinarovskyi

And an even better way of doing it is using the getmany() function, with its argument timeout_ms which does exactly what I want!

A problem with my earlier approach was that highwater() is only available after the first fetch, so my earlier method couldn't be used for empty topics.

ZmeiGorynych avatar May 29 '17 09:05 ZmeiGorynych

Can you describe you're usecase in details. It may be good to include an API to do exactly that.

tvoinarovskyi avatar May 29 '17 19:05 tvoinarovskyi

Consuming realtime data from several Kafka topics, in no particular order; and because of assumptions I have about the data, if there's no data from a given topic for n seconds, I want to send a heartbeat instead and keep listening - so getmany is perfect, if it returns None after timeout_ms I know I need to send a heartbeat instead.

ZmeiGorynych avatar May 30 '17 13:05 ZmeiGorynych