faust
faust copied to clipboard
Memory keeps increasing and it is never freed even when the agents do nothing
Steps to reproduce
Basically I commented bits and bits of my entire app until I ended up with two agents doing...nothing. I didn't post my entire app because at the moment it is literally deployed and it is doing nothing. It just receives events, prints them and that's it.
app = faust.App(
KafkaSettings.JOB_WORKER_NAME,
broker=KafkaSettings.KAFKA_BROKER,
topic_partitions=1,
autodiscover=True,
origin="worker",
datadir="/tmp/",
web_in_thread=False,
stream_wait_empty=False,
broker_credentials=faust.SASLCredentials(
mechanism="SCRAM-SHA-512",
username=kafka_broker_creds['username'],
password=kafka_broker_creds['password'],
),
)
switchlogs_topic_cfgevents = app.topic(KafkaSettings.Topics.SWITCHLOGS_CFGEVENTS)
switchlogs_topic_jobevents = app.topic(KafkaSettings.Topics.SWITCHLOGS_JOBEVENTS)
@app.agent(switchlogs_topic_cfgevents, concurrency=1)
async def cfgevents_agent(stream):
async for event in stream:
print(event)
continue
# await cfgevents_processor(event)
@app.agent(switchlogs_topic_jobevents, concurrency=1)
async def jobevents_agent(stream):
async for event in stream:
print(event)
continue
# await jobevents_processor(event)
Expected behavior
With absolutely no processing being done, I expected the memory not to infinitely grow. My initial thoughts were:
- It's because of an old version of faust-streaming - updated to 0.7.7, no change
- This is because I use
run_in_executor
at some point to execute some sync code asynchronously - it wasn't this; commented the code, nothing changed - This is because concurrency is set too high - I had 8, I forced to 1, no help
- This is because I have
async for event in stream
instead ofasync for event in stream.events()
- no change
I saw people having this issue with the old robinhood faust, but nobody actually ever found a solution. Example: https://github.com/robinhood/faust/issues/433
Actual behavior
The memory keeps increasing in small amounts and it is never freed. The more the events the topics receive, the faster the memory increases. If I wait enough time, in a few days some of them will be killed due to OOM. Currently I have 6 kubernetes replicas.
Versions
- Python version: 3.7
- Faust version: 0.7.7
- Operating system: RH7
- Kafka version: I'm using this with Redpanda which is Kafka compatible
Hi, is it possible that in your agents you need yield something at some point, so that the stream will release the for loop cycle and its memory? Try yield instead of continue please.
Hi, is it possible that in your agents you need yield something at some point, so that the stream will release the for loop cycle and its memory? Try yield instead of continue please.
Do I need to yield
at the end of the agent all the time? If I have a dummy functions that just returns and then have in the agent f(event)
it's the same behaviour, the memory increases. I didn't see anything about yield
-ing the event in the agent in the official documentation.
I've been suffering from same issue, however adding yield is not helping at all...
bump