aiokafka icon indicating copy to clipboard operation
aiokafka copied to clipboard

[QUESTION] How is your library asynchronous?

Open paulpaul1076 opened this issue 1 year ago • 3 comments

I just tried the sample code for AIOKafkaConsumer from your github page on a topic with 1 partition. At each iteration I output msg.offset, and the offsets appear in order. So, how is this library even asynchronous? I don't understand what's asynchronous about it.

Here's the code that I ran with ip and topics omitted:

from aiokafka import AIOKafkaConsumer
import asyncio
 
async def consume():
    consumer = AIOKafkaConsumer(
        'my_topic', 'my_other_topic',
        bootstrap_servers='localhost:9092',
        group_id="my-group")
    # Get cluster layout and join group `my-group`
    await consumer.start()
    try:
        # Consume messages
        async for msg in consumer:
            print(msg.offset)
    finally:
        # Will leave consumer group; perform autocommit if enabled.
        await consumer.stop()
 
asyncio.run(consume())

I am not an advanced python user, I use scala, but from what I understand each iteration of the for loop has to create an async task for reading a kafka message.

Now imagine the message at offset 0 to be something that needs 1 hour to download, and the message at offset 1 takes 1 second to download, the callback for message 1 should be executed before the callback for message 0, and they are supposed to come out of order. But why do they come in order?

paulpaul1076 avatar Dec 22 '22 13:12 paulpaul1076

The library is asynchronous in the sense that it is compatible with asyncio. The behavior you show above is standard for a Kafka consumer.

If you want to process a lot of data and don't care about order, one thing you can do is use multiple consumers, all with the same group ID. Then each consumer will see a subset of messages. If one message is slow to download, that will not hold up processing of other messages by other consumers (but it will hold up processing of additional messages by the consumer handling the slow message).

I have no idea if it's possible for one consumer to start a download of multiple messages at the same time.

r-owen avatar Jan 30 '23 23:01 r-owen

Can you develop faust app in such way that you have multiple kafka consumers -> single faust processor -> multiple kafka producers?

0xdarkman avatar Mar 02 '23 14:03 0xdarkman

I think the syntax is misleading here.

The async for waits for the next message which will sometimes require a call to brokers for the next batch of messages. Under the hood I think every kafka client gets a batch of messages from brokers upon request and as a periodic refresh. Most of the time you will be able to process the next message since it's already available. If there's a rebalance event, this gets paused until rebalance finishes.

If you want to parse multiple messages in parallel, you would yield in your handling for the consumption of the message. print(msg.offset) would be asyncio.create_task(async_process(msg)). Most people want to only commit an offset once successfully processed (head of line blocking) so you would need to build fancy state logic external to aiokafka. There's some prior work on this like https://www.uber.com/blog/kafka-async-queuing-with-consumer-proxy/

Details on fetching and prefetching are in docs https://github.com/aio-libs/aiokafka/blob/master/docs/kafka-python_difference.rst#consumer-has-no-poll-method

Aergonus avatar Apr 20 '23 17:04 Aergonus