faust
faust copied to clipboard
Consumers slowly die over time
Running a basic application for any extended period of time - sometimes just a few minutes - leads to consumers slowly dying one-by-one until they've all stopped responding and the entire application hangs. The data is still flowing into a topic at the tip of the pipeline from an external source, so there are still messages to be read as they begin to hang.
The following messages are displayed in increasing numbers as things begin to die:
[2021-07-21 01:04:39,262] [31912] [ERROR] [^---AIOKafkaConsumerThread]: Stream has not started processing TP(topic='archive-log-events', partition=36) (started 1.10 hour ago).
There are multiple possible explanations for this:
1) The processing of a single event in the stream
is taking too long.
The timeout for this is defined by the stream_processing_timeout setting,
currently set to 300.0. If you expect the time required to process an event, to be greater than this then please
increase the timeout.
2) The stream has stopped processing events for some reason.
3) The agent processing the stream is hanging (waiting for network, I/O or infinite loop).
I've tested this with agents performing simple functions and the behavior still shows. Is there any obvious reason why this would be occurring or a reasonable way to debug it?
Checklist
- [X] I have included information about relevant versions
- [X] I have verified that the issue persists when using the
master
branch of Faust.
Steps to reproduce
Run an application with multiple agents.
Expected behavior
Things run indefinitely.
Actual behavior
Things run and slowly begin stop.
Full traceback
N/A
Versions
- Python version:
3.8.6
- Faust version:
0.6.9
- Operating system:
Ubuntu 18.04
- Kafka version:
2.8.0
- RocksDB version (if applicable)
we have the same issue right now, there is no clear error detail on whats going on
Have you tried to increase the broker_commit_livelock_soft_timeout or the broker_max_poll_interval?
Have you tried to increase the broker_commit_livelock_soft_timeout or the broker_max_poll_interval?
thanks, I'll try it in production
@mavalderrama Did that resolve your problem?
yes I noticed this behaviour too. Anybody has a solution to this?
@mavalderrama Did that resolve your problem?
I'm sorry, we changed those params to big numbers without success
@mavalderrama Did that resolve your problem?
I'm sorry, we changed those params to big numbers without success
Many thanks @mavalderrama for testing it. I'll try running faust in debug mode to check if I can see in logs
same issue
[2022-02-17 10:08:01,225] [1] [ERROR] [^---AIOKafkaConsumerThread]: Stream stopped processing, or is slow for TP(topic='my_topic', partition=0) (last inbound 1.92 day ago).
There are multiple possible explanations for this:
1) The processing of a single event in the stream
is taking too long.
The timeout for this is defined by the stream_processing_timeout setting,
currently set to 300.0. If you expect the time
required to process an event, to be greater than this then please
increase the timeout.
2) The stream has stopped processing events for some reason.
3) The agent processing the stream is hanging (waiting for network, I/O or infinite loop).
- Python version:
3.8
- Faust version:
0.8.2
- Operating system:
Ubuntu 20.04
I have the exactly same thing happening.
Faust: 0.8.1 Python: 3.7.11
I'm joining in the club of having the same issue. In fact this error and the fact that agents die and do not heal up automatically is the only thing that blocks me from using this project in a production system. A pity since for a while, while running it works very well and it's very easy to set up.
In my case I'm using faust inside a docker image:
Operating system: python 3.8-slim-buster Faust version: 0.8.4 Python: 3.8
I noticed that if you delete the topic in Kafka, recreating it, it stops producing the error. but after a while (some days) it starts again.
Note: I'm using a Kafka Broker from Confluent Cloud, their managed service.
It is super frustrating, we're moving a considerably big part of our code base to be Faust based but it doesn't look reliable during our development.
Can you guys provide a sample code so we can reproduce the same behavior? We have been using Faust extensively in production for almost two years and have never had the same issue.
I'm on @rafaelcapucho's team, we were able to find the cause of the issue in our case. We were not going back enough in the logs, but after doing so we saw agents crashing because of exceptions:
[2022-07-05 18:24:38,244] [1] [ERROR] [^----FaustAgent*: [.]our_agent isolated={3}]: Crashed reason=TypeError("__init__() got an unexpected keyword argument 'size'")
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/faust/agents/agent.py", line 674, in _execute_actor
await coro
File "/usr/local/lib/python3.7/site-packages/faust/agents/agent.py", line 696, in _slurp
async for value in it:
File "/app/.../agents.py", line 415, in our_agent
value = await do_something(...)
...
TypeError: __init__() got an unexpected keyword argument 'size'
After the above but before the Stream stopped processing, or is slow
messages, we would see
[2022-07-05 18:28:08,128] [1] [ERROR] [^---AIOKafkaConsumerThread]: Has not committed TP(topic='topic-name', partition=3) at all since worker start (started 5.02 minutes ago).
There are multiple possible explanations for this:
1) The processing of a single event in the stream
is taking too long.
The timeout for this is defined by the broker_commit_livelock_soft_timeout setting,
currently set to 300.0. If you expect the time
required to process an event, to be greater than this then please
increase the timeout.
2) The commit handler background thread has stopped working (report as bug).
As a note, we are iterating over streams using faust.Stream.noack
and manually acking events with faust.Stream.noack
; I'm not entirely sure that makes a difference in this case as opposed to iterating over the stream directly.
Hopefully this helps somebody to get unstuck, cheers!
I've also been encountering similar errors. I think the first step out of this mess is to give agents better logging capabilities. @elacuesta would you be allowed to show us more of your code for example? I'm having trouble connecting the dots for this segment:
File "/app/.../agents.py", line 415, in our_agent value = await do_something(...) ... TypeError: __init__() got an unexpected keyword argument 'size'
Sorry, I don't have those logs anymore. Was I posted was an anonymized version of an actual agent we have, basically to show that there was an exception somewhere down the call chain. Specifically, the agent our_agent
was invoking the do_something
coroutine, which was in turn invoking other coroutines, so on and so forth, until some code tried to create an object passing an invalid keyword argument (TypeError: __init__() got an unexpected keyword argument 'size'
). The missing part of the traceback is not relevant, it's just the call chain that led to the exception from the initial invocation in the agent.
I decided to give faust another try today, a year after submitting this issue and giving up shortly afterwards, but pretty much immediately hit this same problem again. There were exceptions being thrown due to a bug around topics.py:465
(FutureMessage has no channel attribute) but after fixing that, and ensuring that there were absolutely no other exceptions being thrown, the consumers still continued to slowly die until the application ground to a halt. This application has essentially no overlap in (our) code, configuration, event patterns, purpose, etc, with the one from last year that also showed this issue.
I'm curious to learn about cases where faust is being successfully used in production for a non-trivial workload, and how the configurations and pipeline architecture being used differ from ours. I have to imagine this issue is affecting more than just a small number of users, given the steady flow of people popping into this thread and blog posts like this, so I hope this issue (and similar) make it to the top of the priority list for those working on/comfortable with the codebase.
If anyone can provide a non-trivial example project that doesn't show this behavior, I can run it in our environment and see how it fares, which may hopefully help start narrowing things down a bit.
(FutureMessage has no channel attribute)
I just took a look in topics.py and I see immediately what you're talking about. I'll make a patch for that ASAP in https://github.com/faust-streaming/faust/pull/342.
I'm curious to learn about cases where faust is being successfully used in production for a non-trivial workload
I've had success using Faust in production for synchronizing information across multiple applications, not sure how detailed I'm allowed to describe our use-case though.
Taken from your link provided: https://kapernikov.com/a-comparison-of-stream-processing-frameworks
Faust has support for creating offline unit tests (so you could test and debug a stream processor without having a Kafka broker at your disposal).
We really need to make our unit tests more readable. So many existing issues in this project come from the lack of tests when a new feature is added (which I'm guilty of).
excuse me! some one please let me know this issue been fixed yet? my consumer working with 10% my message on kafka topic and stop, ikd what to do. please help me
[2022-12-07 23:38:44,782] [74736] [WARNING] [^--Consumer]: wait_empty: Waiting for tasks [(1, <ConsumerMessage: TopicPartition(topic='complete_collect', partition=2) offset=0>), (1, <ConsumerMessage: TopicPartition(topic='complete_collect', partition=3) offset=0>)]
[2022-12-07 23:38:44,790] [74736] [INFO] [^--Consumer]: Agent tracebacks:
=======================================
TRACEBACK OF ALL RUNNING AGENT ACTORS
=======================================
@batman616 in the end, this problem of the consumers ended up being a showstopper for me, so I moved on from faust to try something else. In my case, I've been using aiokafka in a production system and I'm pleased so far. No consumers dying, and a simpler solution.