faststream
faststream copied to clipboard
Bug: Event ack'd after calling `KafkaMessage.nack` with `auto_commit=False`
Describe the bug
Events are ack'd after calling nack()
.
How to reproduce
from faststream import FastStream, Logger
from faststream.kafka import KafkaMessage
broker = KafkaBroker()
app = FastStream(broker)
@broker.subscriber("test", group_id="async_subscriber", auto_commit=False)
async def async_subscriber(message: str, logger: Logger, msg: KafkaMessage):
logger.info(message)
await msg.nack()
Expected behavior
After publishing an event to the test
topic, the message is logged and consumer does not ack the message.
Observed behavior The message is logged and it appears the consumer acks the message. In kafka-ui I see the consumer group is 0 messages behind.
Environment
Running FastStream 0.2.15 with CPython 3.11.6 on Linux
Hello @bradydean,
Apologies for replying late. Thanks for letting us know about the issue. we were working on releasing redis support so couldn't work on this issue. If you are interested in that feature, please take a look https://github.com/airtai/faststream/releases/tag/0.3.0rc0. We will look at this issue and will fix it soon.
Interesting... FastStream doen't call consumer.commit()
in this case and passes enable_auto_commit=False
to the original AiokafkaConsumer correctly. But, the message still acked...
Seems like with aiokafka >= 0.9 all works fine. All tests at 0.3.5 versions are working correctly, so we can close it, I suppose
Hi, I'm still experiencing this issue with Faststream 0.47.0.
from typing import Any, Dict
from faststream import FastStream
from faststream.kafka import KafkaBroker
from faststream.kafka.annotations import (
KafkaMessage,
Logger
)
broker = KafkaBroker("localhost:9093")
app = FastStream(broker)
@broker.subscriber("source", group_id="foo", auto_commit=False)
async def async_subscriber(message: Dict[str,Any], logger: Logger, msg: KafkaMessage):
logger.info(message)
await msg.nack() # calling this or NOT calling this doesn't change the behavior
If I send some messages with
for i in $(seq 1 20); do echo "${i}:{\"index\":\"${i}\"}" | kafka-console-producer.sh --bootstrap-server localhost:9093 --topic source --property parse.key=true --property key.separator=:; done
I see
2024-04-12 18:08:56,994 INFO - source | foo | 110-171293 - Received
2024-04-12 18:08:56,995 INFO - source | foo | 110-171293 - {'index': '1'}
2024-04-12 18:08:56,996 INFO - source | foo | 110-171293 - Processed
2024-04-12 18:08:58,556 INFO - source | foo | 111-171293 - Received
2024-04-12 18:08:58,557 INFO - source | foo | 111-171293 - {'index': '2'}
2024-04-12 18:08:58,557 INFO - source | foo | 111-171293 - Processed
2024-04-12 18:09:00,131 INFO - source | foo | 112-171293 - Received
2024-04-12 18:09:00,132 INFO - source | foo | 112-171293 - {'index': '3'}
2024-04-12 18:09:00,132 INFO - source | foo | 112-171293 - Processed
2024-04-12 18:09:01,679 INFO - source | foo | 113-171293 - Received
2024-04-12 18:09:01,680 INFO - source | foo | 113-171293 - {'index': '4'}
2024-04-12 18:09:01,680 INFO - source | foo | 113-171293 - Processed
2024-04-12 18:09:03,355 INFO - source | foo | 114-171293 - Received
2024-04-12 18:09:03,356 INFO - source | foo | 114-171293 - {'index': '5'}
2024-04-12 18:09:03,356 INFO - source | foo | 114-171293 - Processed
I was expecting to consume the same message again and again.
Thanks,
lorenzo
(edit: same behavior if I raise a NackMessage
exception)
I am not sure, what is it: we creates AIOKafkaConsumer(..., enable_auto_commit=False)
in such case and does not call consumer.commit()
method at all.
I have no ideas why it commits the offset anyway. @kumaranvpl have you any ideas?
@Lancetnik @lorenzobenvenuti Right now I have no idea why this is happening. Let me try to replicate the same in local. And then I will work on the fix.
Thanks @Lancetnik @kumaranvpl . You can reproduce the issue with this application.
I had a quick look at the code, as far as I can see aiokafka
doesn't commit the offset but it keeps increasing the value in memory (see FetchResult._update_position
) regardless of the commit policy and this means that, even if the n-th message is not ack-ed, the consumer will consume message n+1, n+2 and so on. I don't know if this is the expected behavior: coming from other frameworks (for example Apache Camel) I was expecting that "nack" was seeking the offset back to the last committed one (or just the previous one? I guess in a scenario where you're ack-ing/nack-ing a message at once you want to proceed one message at a time?).
As a workaround, the consumer can explicitly call seek
:
@broker.subscriber("source-topic", group_id="foo", auto_commit=False)
async def async_subscriber(body: Dict[str,Any], logger: Logger, msg: KafkaMessage):
logger.info(body)
await msg.nack()
msg.consumer.seek(TopicPartition(msg.raw_message.topic, msg.raw_message.partition), msg.raw_message.offset)
@lorenzobenvenuti
I think we can just use smth like that:
class KafkaMessage:
async def nack(self) -> None:
await self.consumer.seek_to_committed()
And it looks correct, but I am not sure about sideeffects or potential problems
Hello @bradydean,
This bug has been fixed as part of https://github.com/airtai/faststream/releases/tag/0.5.18 release.
Following is a sample code
from typing import Any, Dict
from faststream import FastStream
from faststream.kafka import KafkaBroker
from faststream.kafka.annotations import (
KafkaMessage,
Logger
)
import json
import asyncio
broker = KafkaBroker("localhost:9092")
topic = "kafka-nack-test"
app = FastStream(broker)
@broker.subscriber(topic, group_id="foo", auto_commit=False, auto_offset_reset="earliest")
async def async_subscriber(body: Dict[str,Any], logger: Logger, msg: KafkaMessage):
logger.info(body)
await msg.nack()
@app.after_startup
async def publish_something() -> None:
async def _publish_something() -> None:
i = 10
print(f"Sleeping for {i} seconds")
await asyncio.sleep(i)
message = {"hi": "there"}
await broker.publish(message, topic=topic)
print("Published message" + json.dumps(message))
asyncio.create_task(_publish_something())