faust icon indicating copy to clipboard operation
faust copied to clipboard

Consumers slowly die over time

Open pb376 opened this issue 2 years ago • 27 comments

Running a basic application for any extended period of time - sometimes just a few minutes - leads to consumers slowly dying one-by-one until they've all stopped responding and the entire application hangs. The data is still flowing into a topic at the tip of the pipeline from an external source, so there are still messages to be read as they begin to hang.

The following messages are displayed in increasing numbers as things begin to die:

[2021-07-21 01:04:39,262] [31912] [ERROR] [^---AIOKafkaConsumerThread]: Stream has not started processing TP(topic='archive-log-events', partition=36) (started 1.10 hour ago). 
                                                                                                                                          
There are multiple possible explanations for this:                                                                                                
                                                                                                                                                                                              
1) The processing of a single event in the stream                                                                                    
   is taking too long.                                                                                                                    
                                                                                                                                          
    The timeout for this is defined by the stream_processing_timeout setting,                                                             
    currently set to 300.0.  If you expect the time                                                                                                                                               required to process an event, to be greater than this then please                                                                                            
    increase the timeout.                                                                                                                                                                     
                                                                                                                                          
 2) The stream has stopped processing events for some reason.                                                                       
                                                                                                                                          
3) The agent processing the stream is hanging (waiting for network, I/O or infinite loop).  

I've tested this with agents performing simple functions and the behavior still shows. Is there any obvious reason why this would be occurring or a reasonable way to debug it?

Checklist

  • [X] I have included information about relevant versions
  • [X] I have verified that the issue persists when using the master branch of Faust.

Steps to reproduce

Run an application with multiple agents.

Expected behavior

Things run indefinitely.

Actual behavior

Things run and slowly begin stop.

Full traceback

N/A

Versions

  • Python version: 3.8.6
  • Faust version: 0.6.9
  • Operating system: Ubuntu 18.04
  • Kafka version: 2.8.0
  • RocksDB version (if applicable)

pb376 avatar Jul 21 '21 01:07 pb376

we have the same issue right now, there is no clear error detail on whats going on

mavalderrama avatar Aug 03 '21 23:08 mavalderrama

Have you tried to increase the broker_commit_livelock_soft_timeout or the broker_max_poll_interval?

PJ-Schulz avatar Aug 09 '21 06:08 PJ-Schulz

Have you tried to increase the broker_commit_livelock_soft_timeout or the broker_max_poll_interval?

thanks, I'll try it in production

mavalderrama avatar Aug 10 '21 18:08 mavalderrama

@mavalderrama Did that resolve your problem?

taybin avatar Oct 21 '21 14:10 taybin

yes I noticed this behaviour too. Anybody has a solution to this?

oneryalcin avatar Nov 11 '21 15:11 oneryalcin

@mavalderrama Did that resolve your problem?

I'm sorry, we changed those params to big numbers without success

mavalderrama avatar Nov 11 '21 15:11 mavalderrama

@mavalderrama Did that resolve your problem?

I'm sorry, we changed those params to big numbers without success

Many thanks @mavalderrama for testing it. I'll try running faust in debug mode to check if I can see in logs

oneryalcin avatar Nov 11 '21 16:11 oneryalcin

same issue

[2022-02-17 10:08:01,225] [1] [ERROR] [^---AIOKafkaConsumerThread]: Stream stopped processing, or is slow for TP(topic='my_topic', partition=0) (last inbound 1.92 day ago). 

There are multiple possible explanations for this:

1) The processing of a single event in the stream
   is taking too long.

    The timeout for this is defined by the stream_processing_timeout setting,
    currently set to 300.0.  If you expect the time
    required to process an event, to be greater than this then please
    increase the timeout.

 2) The stream has stopped processing events for some reason.

3) The agent processing the stream is hanging (waiting for network, I/O or infinite loop). 
  • Python version: 3.8
  • Faust version: 0.8.2
  • Operating system: Ubuntu 20.04

pikhovkin avatar Feb 18 '22 08:02 pikhovkin

I have the exactly same thing happening.

Faust: 0.8.1 Python: 3.7.11

rafaelcapucho avatar Feb 25 '22 04:02 rafaelcapucho

I'm joining in the club of having the same issue. In fact this error and the fact that agents die and do not heal up automatically is the only thing that blocks me from using this project in a production system. A pity since for a while, while running it works very well and it's very easy to set up.

In my case I'm using faust inside a docker image:

Operating system: python 3.8-slim-buster Faust version: 0.8.4 Python: 3.8

Rydra avatar Mar 03 '22 15:03 Rydra

I noticed that if you delete the topic in Kafka, recreating it, it stops producing the error. but after a while (some days) it starts again.

Note: I'm using a Kafka Broker from Confluent Cloud, their managed service.

It is super frustrating, we're moving a considerably big part of our code base to be Faust based but it doesn't look reliable during our development.

rafaelcapucho avatar Jun 30 '22 15:06 rafaelcapucho

Can you guys provide a sample code so we can reproduce the same behavior? We have been using Faust extensively in production for almost two years and have never had the same issue.

payamesfandiari avatar Jul 03 '22 20:07 payamesfandiari

I'm on @rafaelcapucho's team, we were able to find the cause of the issue in our case. We were not going back enough in the logs, but after doing so we saw agents crashing because of exceptions:

[2022-07-05 18:24:38,244] [1] [ERROR] [^----FaustAgent*: [.]our_agent isolated={3}]: Crashed reason=TypeError("__init__() got an unexpected keyword argument 'size'") 
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/faust/agents/agent.py", line 674, in _execute_actor
    await coro
  File "/usr/local/lib/python3.7/site-packages/faust/agents/agent.py", line 696, in _slurp
    async for value in it:
  File "/app/.../agents.py", line 415, in our_agent
    value = await do_something(...)
    ...
TypeError: __init__() got an unexpected keyword argument 'size'

After the above but before the Stream stopped processing, or is slow messages, we would see

[2022-07-05 18:28:08,128] [1] [ERROR] [^---AIOKafkaConsumerThread]: Has not committed TP(topic='topic-name', partition=3) at all since worker start (started 5.02 minutes ago).

There are multiple possible explanations for this:

1) The processing of a single event in the stream
   is taking too long.

    The timeout for this is defined by the broker_commit_livelock_soft_timeout setting,
    currently set to 300.0.  If you expect the time
    required to process an event, to be greater than this then please
    increase the timeout.

 2) The commit handler background thread has stopped working (report as bug). 

As a note, we are iterating over streams using faust.Stream.noack and manually acking events with faust.Stream.noack; I'm not entirely sure that makes a difference in this case as opposed to iterating over the stream directly.

Hopefully this helps somebody to get unstuck, cheers!

elacuesta avatar Jul 07 '22 14:07 elacuesta

I've also been encountering similar errors. I think the first step out of this mess is to give agents better logging capabilities. @elacuesta would you be allowed to show us more of your code for example? I'm having trouble connecting the dots for this segment:

  File "/app/.../agents.py", line 415, in our_agent
    value = await do_something(...)
    ...
TypeError: __init__() got an unexpected keyword argument 'size'

wbarnha avatar Aug 17 '22 16:08 wbarnha

Sorry, I don't have those logs anymore. Was I posted was an anonymized version of an actual agent we have, basically to show that there was an exception somewhere down the call chain. Specifically, the agent our_agent was invoking the do_something coroutine, which was in turn invoking other coroutines, so on and so forth, until some code tried to create an object passing an invalid keyword argument (TypeError: __init__() got an unexpected keyword argument 'size'). The missing part of the traceback is not relevant, it's just the call chain that led to the exception from the initial invocation in the agent.

elacuesta avatar Aug 17 '22 16:08 elacuesta

I decided to give faust another try today, a year after submitting this issue and giving up shortly afterwards, but pretty much immediately hit this same problem again. There were exceptions being thrown due to a bug around topics.py:465 (FutureMessage has no channel attribute) but after fixing that, and ensuring that there were absolutely no other exceptions being thrown, the consumers still continued to slowly die until the application ground to a halt. This application has essentially no overlap in (our) code, configuration, event patterns, purpose, etc, with the one from last year that also showed this issue.

I'm curious to learn about cases where faust is being successfully used in production for a non-trivial workload, and how the configurations and pipeline architecture being used differ from ours. I have to imagine this issue is affecting more than just a small number of users, given the steady flow of people popping into this thread and blog posts like this, so I hope this issue (and similar) make it to the top of the priority list for those working on/comfortable with the codebase.

If anyone can provide a non-trivial example project that doesn't show this behavior, I can run it in our environment and see how it fares, which may hopefully help start narrowing things down a bit.

pb376 avatar Aug 18 '22 05:08 pb376

(FutureMessage has no channel attribute)

I just took a look in topics.py and I see immediately what you're talking about. I'll make a patch for that ASAP in https://github.com/faust-streaming/faust/pull/342.

I'm curious to learn about cases where faust is being successfully used in production for a non-trivial workload

I've had success using Faust in production for synchronizing information across multiple applications, not sure how detailed I'm allowed to describe our use-case though.

Taken from your link provided: https://kapernikov.com/a-comparison-of-stream-processing-frameworks

Faust has support for creating offline unit tests (so you could test and debug a stream processor without having a Kafka broker at your disposal).

We really need to make our unit tests more readable. So many existing issues in this project come from the lack of tests when a new feature is added (which I'm guilty of).

wbarnha avatar Aug 18 '22 14:08 wbarnha

excuse me! some one please let me know this issue been fixed yet? my consumer working with 10% my message on kafka topic and stop, ikd what to do. please help me

[2022-12-07 23:38:44,782] [74736] [WARNING] [^--Consumer]: wait_empty: Waiting for tasks [(1, <ConsumerMessage: TopicPartition(topic='complete_collect', partition=2) offset=0>), (1, <ConsumerMessage: TopicPartition(topic='complete_collect', partition=3) offset=0>)] 
[2022-12-07 23:38:44,790] [74736] [INFO] [^--Consumer]: Agent tracebacks:
=======================================
 TRACEBACK OF ALL RUNNING AGENT ACTORS
=======================================

batman616 avatar Dec 07 '22 16:12 batman616

@batman616 in the end, this problem of the consumers ended up being a showstopper for me, so I moved on from faust to try something else. In my case, I've been using aiokafka in a production system and I'm pleased so far. No consumers dying, and a simpler solution.

Rydra avatar Dec 07 '22 17:12 Rydra