faust icon indicating copy to clipboard operation
faust copied to clipboard

Question: Delayed retry queue implementation in Faust?

Open somnam opened this issue 1 year ago • 2 comments

Hi,

I'm trying to implement a delayed retry queue using Kafka and Faust. The idea is that after failing to process a message it gets forwarded to an RQ topic. Messages consumed from this topic aren't immediately reprocessed as a delay is applied first.

A more detailed description of this approach can be found here and here. I've managed to come up with a working Aiokafka implementation but struggle to implement this pattern in Faust (which is the library used in the app I need to add the queue to).

In Aiokafka I use the pause / resume consumer api. After consuming a message I pause the partitions assigned to the consumer. To not exceed the limit of max_poll_interval_ms a separate asyncio task gets created which handles the delay and reprocessing. This task sleeps for the given delay time during which the context can be switched to consumer loop. The loop doesn't return any new messages due to paused partitions but polls Kafka so max_poll_interval_ms isn't reached. After reprocessing the message is committed and consumer partitions are re-enabled:

async def consume(topic: str, bootstrap_servers: str, group_id: str) -> None:
    consumer = AIOKafkaConsumer(
        topic,
        bootstrap_servers=bootstrap_servers,
        group_id=group_id,
        enable_auto_commit=False,  # will manually commit each message
        max_poll_interval_ms=1000,  # only 1s delay between poll required
    )
    await consumer.start()

    try:
        async for message in consumer:
            consumer.pause(*consumer.assignment())
            create_task(delayed_processing(consumer, message, 60))  # delay by 60s from message.timestamp
    finally:
        await consumer.stop()

async def delayed_processing(consumer: AIOKafkaConsumer, message: ConsumerRecord, delay: int) -> None:
    try:
        processing_delay = get_message_processing_delay(message, delay)
        await asyncio.sleep(processing_delay)
        run_processing(message)
        await consumer.commit()
    finally:
        consumer.resume(*consumer.assignment())

I think that the Faust implementation should follow the same pattern (it uses aiokafka under the hood) but I can't manage to have the stream paused / resumed after consuming a message. When I use the provided pause_partitions and resume_partitions api methods the max_poll_interval is reached (they don't seem to touch the AIOKafka consumer in any way). When I use the underlying AIOKafkaConsumer thread to pause the consumer in the same manner as above, the event stream keeps on giving me new messages anyway.

Do you know of a way to implement this pattern using Faust or maybe Faust requres a different approach?

Checklist

  • [x] I have included information about relevant versions
  • [ ] I have verified that the issue persists when using the master branch of Faust.

Versions

  • Python version: 3.10
  • Faust version: 0.10.4
  • Operating system
  • Kafka version
  • RocksDB version (if applicable)

somnam avatar Mar 14 '23 09:03 somnam