faststream icon indicating copy to clipboard operation
faststream copied to clipboard

Bug: Event ack'd after calling `KafkaMessage.nack` with `auto_commit=False`

Open bradydean opened this issue 1 year ago • 9 comments

Describe the bug

Events are ack'd after calling nack().

How to reproduce

from faststream import FastStream, Logger
from faststream.kafka import KafkaMessage

broker = KafkaBroker()
app = FastStream(broker)

@broker.subscriber("test", group_id="async_subscriber", auto_commit=False)
async def async_subscriber(message: str, logger: Logger, msg: KafkaMessage):
    logger.info(message)
    await msg.nack()

Expected behavior

After publishing an event to the test topic, the message is logged and consumer does not ack the message.

Observed behavior The message is logged and it appears the consumer acks the message. In kafka-ui I see the consumer group is 0 messages behind.

Environment

Running FastStream 0.2.15 with CPython 3.11.6 on Linux

bradydean avatar Nov 28 '23 17:11 bradydean

Hello @bradydean,

Apologies for replying late. Thanks for letting us know about the issue. we were working on releasing redis support so couldn't work on this issue. If you are interested in that feature, please take a look https://github.com/airtai/faststream/releases/tag/0.3.0rc0. We will look at this issue and will fix it soon.

kumaranvpl avatar Dec 01 '23 12:12 kumaranvpl

Interesting... FastStream doen't call consumer.commit() in this case and passes enable_auto_commit=False to the original AiokafkaConsumer correctly. But, the message still acked...

Lancetnik avatar Dec 04 '23 08:12 Lancetnik

Seems like with aiokafka >= 0.9 all works fine. All tests at 0.3.5 versions are working correctly, so we can close it, I suppose

Lancetnik avatar Dec 13 '23 17:12 Lancetnik

Hi, I'm still experiencing this issue with Faststream 0.47.0.

from typing import Any, Dict
from faststream import FastStream
from faststream.kafka import KafkaBroker
from faststream.kafka.annotations import (
    KafkaMessage,
    Logger
)

broker = KafkaBroker("localhost:9093")
app = FastStream(broker)

@broker.subscriber("source", group_id="foo", auto_commit=False)
async def async_subscriber(message: Dict[str,Any], logger: Logger, msg: KafkaMessage):
    logger.info(message)
    await msg.nack() # calling this or NOT calling this doesn't change the behavior

If I send some messages with

for i in $(seq 1 20); do echo "${i}:{\"index\":\"${i}\"}" | kafka-console-producer.sh --bootstrap-server localhost:9093  --topic source --property parse.key=true --property key.separator=:; done

I see

2024-04-12 18:08:56,994 INFO     - source | foo | 110-171293 - Received
2024-04-12 18:08:56,995 INFO     - source | foo | 110-171293 - {'index': '1'}
2024-04-12 18:08:56,996 INFO     - source | foo | 110-171293 - Processed
2024-04-12 18:08:58,556 INFO     - source | foo | 111-171293 - Received
2024-04-12 18:08:58,557 INFO     - source | foo | 111-171293 - {'index': '2'}
2024-04-12 18:08:58,557 INFO     - source | foo | 111-171293 - Processed
2024-04-12 18:09:00,131 INFO     - source | foo | 112-171293 - Received
2024-04-12 18:09:00,132 INFO     - source | foo | 112-171293 - {'index': '3'}
2024-04-12 18:09:00,132 INFO     - source | foo | 112-171293 - Processed
2024-04-12 18:09:01,679 INFO     - source | foo | 113-171293 - Received
2024-04-12 18:09:01,680 INFO     - source | foo | 113-171293 - {'index': '4'}
2024-04-12 18:09:01,680 INFO     - source | foo | 113-171293 - Processed
2024-04-12 18:09:03,355 INFO     - source | foo | 114-171293 - Received
2024-04-12 18:09:03,356 INFO     - source | foo | 114-171293 - {'index': '5'}
2024-04-12 18:09:03,356 INFO     - source | foo | 114-171293 - Processed

I was expecting to consume the same message again and again.

Thanks,

lorenzo

(edit: same behavior if I raise a NackMessage exception)

lorenzobenvenuti avatar Apr 12 '24 16:04 lorenzobenvenuti

I am not sure, what is it: we creates AIOKafkaConsumer(..., enable_auto_commit=False) in such case and does not call consumer.commit() method at all.

I have no ideas why it commits the offset anyway. @kumaranvpl have you any ideas?

Lancetnik avatar Apr 16 '24 20:04 Lancetnik

@Lancetnik @lorenzobenvenuti Right now I have no idea why this is happening. Let me try to replicate the same in local. And then I will work on the fix.

kumaranvpl avatar Apr 17 '24 06:04 kumaranvpl

Thanks @Lancetnik @kumaranvpl . You can reproduce the issue with this application.

lorenzobenvenuti avatar Apr 17 '24 07:04 lorenzobenvenuti

I had a quick look at the code, as far as I can see aiokafka doesn't commit the offset but it keeps increasing the value in memory (see FetchResult._update_position) regardless of the commit policy and this means that, even if the n-th message is not ack-ed, the consumer will consume message n+1, n+2 and so on. I don't know if this is the expected behavior: coming from other frameworks (for example Apache Camel) I was expecting that "nack" was seeking the offset back to the last committed one (or just the previous one? I guess in a scenario where you're ack-ing/nack-ing a message at once you want to proceed one message at a time?).

As a workaround, the consumer can explicitly call seek:

@broker.subscriber("source-topic", group_id="foo", auto_commit=False)
async def async_subscriber(body: Dict[str,Any], logger: Logger, msg: KafkaMessage):
    logger.info(body)
    await msg.nack()
    msg.consumer.seek(TopicPartition(msg.raw_message.topic, msg.raw_message.partition), msg.raw_message.offset)

lorenzobenvenuti avatar Apr 20 '24 10:04 lorenzobenvenuti

@lorenzobenvenuti

I think we can just use smth like that:

class KafkaMessage:
    async def nack(self) -> None:
         await self.consumer.seek_to_committed()

And it looks correct, but I am not sure about sideeffects or potential problems

Lancetnik avatar May 06 '24 16:05 Lancetnik

Hello @bradydean,

This bug has been fixed as part of https://github.com/airtai/faststream/releases/tag/0.5.18 release.

Following is a sample code


from typing import Any, Dict
from faststream import FastStream
from faststream.kafka import KafkaBroker
from faststream.kafka.annotations import (
    KafkaMessage,
    Logger
)
import json
import asyncio

broker = KafkaBroker("localhost:9092")
topic = "kafka-nack-test"

app = FastStream(broker)


@broker.subscriber(topic, group_id="foo", auto_commit=False, auto_offset_reset="earliest")
async def async_subscriber(body: Dict[str,Any], logger: Logger, msg: KafkaMessage):
    logger.info(body)
    await msg.nack()


@app.after_startup
async def publish_something() -> None:
    async def _publish_something() -> None:
        i = 10
        print(f"Sleeping for {i} seconds")
        await asyncio.sleep(i)
        message = {"hi": "there"}
        await broker.publish(message, topic=topic)    
        print("Published message" + json.dumps(message))

    asyncio.create_task(_publish_something())

kumaranvpl avatar Aug 14 '24 04:08 kumaranvpl