faust icon indicating copy to clipboard operation
faust copied to clipboard

Memory keeps increasing and it is never freed even when the agents do nothing

Open popadi opened this issue 3 years ago • 4 comments

Steps to reproduce

Basically I commented bits and bits of my entire app until I ended up with two agents doing...nothing. I didn't post my entire app because at the moment it is literally deployed and it is doing nothing. It just receives events, prints them and that's it.

app = faust.App(
    KafkaSettings.JOB_WORKER_NAME,
    broker=KafkaSettings.KAFKA_BROKER,
    topic_partitions=1,
    autodiscover=True,
    origin="worker",
    datadir="/tmp/",
    web_in_thread=False,
    stream_wait_empty=False,
    broker_credentials=faust.SASLCredentials(
        mechanism="SCRAM-SHA-512",
        username=kafka_broker_creds['username'],
        password=kafka_broker_creds['password'],
    ),
)

switchlogs_topic_cfgevents = app.topic(KafkaSettings.Topics.SWITCHLOGS_CFGEVENTS)
switchlogs_topic_jobevents = app.topic(KafkaSettings.Topics.SWITCHLOGS_JOBEVENTS)

@app.agent(switchlogs_topic_cfgevents, concurrency=1)
async def cfgevents_agent(stream):
    async for event in stream:
        print(event)
        continue
        # await cfgevents_processor(event)


@app.agent(switchlogs_topic_jobevents, concurrency=1)
async def jobevents_agent(stream):
    async for event in stream:
        print(event)
        continue
        # await jobevents_processor(event)

Expected behavior

With absolutely no processing being done, I expected the memory not to infinitely grow. My initial thoughts were:

  • It's because of an old version of faust-streaming - updated to 0.7.7, no change
  • This is because I use run_in_executor at some point to execute some sync code asynchronously - it wasn't this; commented the code, nothing changed
  • This is because concurrency is set too high - I had 8, I forced to 1, no help
  • This is because I have async for event in stream instead of async for event in stream.events() - no change

I saw people having this issue with the old robinhood faust, but nobody actually ever found a solution. Example: https://github.com/robinhood/faust/issues/433

Actual behavior

The memory keeps increasing in small amounts and it is never freed. The more the events the topics receive, the faster the memory increases. If I wait enough time, in a few days some of them will be killed due to OOM. Currently I have 6 kubernetes replicas.

Yeah

Versions

  • Python version: 3.7
  • Faust version: 0.7.7
  • Operating system: RH7
  • Kafka version: I'm using this with Redpanda which is Kafka compatible

popadi avatar Dec 22 '21 08:12 popadi

Hi, is it possible that in your agents you need yield something at some point, so that the stream will release the for loop cycle and its memory? Try yield instead of continue please.

dada-engineer avatar Jan 12 '22 15:01 dada-engineer

Hi, is it possible that in your agents you need yield something at some point, so that the stream will release the for loop cycle and its memory? Try yield instead of continue please.

Do I need to yield at the end of the agent all the time? If I have a dummy functions that just returns and then have in the agent f(event) it's the same behaviour, the memory increases. I didn't see anything about yield-ing the event in the agent in the official documentation.

popadi avatar Jan 23 '22 21:01 popadi

I've been suffering from same issue, however adding yield is not helping at all...

ChansongJo avatar Jan 23 '22 22:01 ChansongJo

bump

zerafachris avatar Apr 03 '23 15:04 zerafachris