Problems with offset commits with filter/group_by pattern
Checklist
- [x] I have included information about relevant versions
- [ ] I have verified that the issue persists when using the
masterbranch of Faust.
Steps to reproduce
I have a typical filter/group_by setup. Simplified to the maximum goes along these lines:
import faust
class Value(faust.Record):
...
app = faust.App(
"app",
broker=...,
broker_credentials=...,
consumer_auto_offset_reset="latest",
topic_partitions=3,
topic_replication_factor=3,
)
source_topic = app.topic("source-topic", value_type=Value)
@app.agent(source_topic)
async def my_agent(stream):
async for value in stream.filter(
lambda v: v > 1000
).group_by(
lambda v: v.some_other_attr,
name="custom-key",
):
pass
This automatically creates topic app.my_agent-source-topic-custom-key-repartition (I might be slightly wrong here, doesn't matter).
Expected behavior
- The repartitioned topic receives regular topic offset commits So that they are always 1 or close to 1 on all partitions.
- Upon every application restarts most recent offset is picked up and processing starts where it left off before shutdown.
Actual behavior
- Repartitioned topic offset is committed only during brief period right after application start (or restart) and then no offset commits take place whatsoever and the lag grows.
- Upon every application restart, repartitioned topic is being read all over again.
Observations
- No tracebacks, no explicit failures.
- Application keeps processing incoming data just fine, doesn't seem like there was a problem here. Tested on low-traffic staging environment and in isolation on one of the mid-traffic production environments. The point is that the performance is not an issue.
- The body of the stream operation is literally
pass. I have stripped all the functionalities one by one and got to "bare bones" Faust. - Source topic receives proper commits, no problem here.
- Tried pre-generated and auto-generated repartition topics - no difference.
- Tried original Faust and the fork - no difference.
- We are using Kafka clusters on Confluent Cloud, observing offsets in their UI. I don't expect major differences if I'd check in CLI.
- And the most important part: If I remove
filterand leave only group_by then the problem is gone, offset commits are tight, back where they should be. The thing is that we really need this filter as only about 30% traffic is eligible for repartition.
Versions
- Python version 3.8.6
- Faust version 0.3.1
- Operating system Debian Buster
- Kafka version (Can't find it now probably most recent. Tried on Basic and Standard Confluent Cloud clusters)
Hi,
I work with @remy-sl on the same faust service. Following the problem I have a few more observations.
Events which should be filtered out are sent to repartitioned topic
It looks like stream.filter(...).group_by(...) pipeline attaches group_by processor to original stream and filter to repartitioned stream. Original stream .info() looks like this:
{'app': <App(builds-aggregator1): [URL('...')] running agents(<AgentManager: running >) 0x7f37a7ec9220>, 'channel': <(*)Topic: ...@0x7f37a58953d0>, 'processors': [<function Stream.group_by.<locals>.repartition at 0x7f37a5820550>], 'on_start': None, 'loop': <_UnixSelectorEventLoop running=True closed=False debug=False>, 'combined': [], 'beacon': Node: Worker/App/AgentManager/Agent: .../Stream: Topic: ..., 'concurrency_index': None, 'prev': None, 'active_partitions': None}
Repartitioned stream info() looks like this:
{'app': <App(builds-aggregator1): [URL('...')] running agents(<AgentManager: running >) 0x7f37a7ec9220>, 'channel': <(*)Topic: ...@0x7f37a5828070>, 'processors': [<function Stream.filter.<locals>.on_value at 0x7f37a58200d0>], 'on_start': <bound method Service.maybe_start of <Stream: init <(*)Topic: vd...x7f37a58953d0>>>, 'loop': <_UnixSelectorEventLoop running=True closed=False debug=False>, 'combined': [], 'beacon': Node: Worker/App/AgentManager/Agent: .../Stream: Topic: .../Stream: Topic: ..., 'concurrency_index': None, 'prev': <Stream: init <(*)Topic: vd...x7f37a58953d0>>, 'active_partitions': None}
Commit offset not advancing
It looks like commit offset stops at first occurrence of Skip() exception from filter(). .filter(lambda v: True) works without problems, while .filter(lambda v: False) makes commit offset freeze. After briefly looking at code I think the problem might lay here: https://github.com/faust-streaming/faust/blob/master/faust/streams.py#L909
When skipped_value occurs finally block is not executed in try/finally below, where ack() is executed.
Hope that helps.
Hi,
We're facing a related problem when using stream.noack() in conjunction with filter function where the offsets aren't committed properly and the filtered out messages would increase the lag on the consumer group. Was there any progress on this issue?