alpakka-kafka icon indicating copy to clipboard operation
alpakka-kafka copied to clipboard

Potential Memory-Leak/Offender in Producer

Open qux42 opened this issue 4 years ago • 8 comments

Versions used

2.0.4, but it's happening with 2.0.5 too

Akka version: 2.6.8

Expected Behavior

Using an shared Producer, we expect that there is only one Buffer (akka.stream.impl.FixedSizeBuffer$ModuloFixedSizeBuffer) for outgoing Messages created.

Actual Behavior

We see one Buffer per Stream. As we create one Stream per user and the default setting for akka.kafka.producer.parallelism changed from 100 to 10000, we have suddenly a lot of huge nearly empty Buffers, that are using most of our memory. As a quick fix, we reduced the parallelism level back to 100. I still don't understand why we have so many buffers. If we search for the akka.kafka.ProducerSettings or org.apache.kafka.clients.producer.KafkaProducer we have 1 from each as expected.

Problem:

In akka.kafka.scaladsl.Producer#flexiFlow an mapAsync(settings.parallelism)(identity) creates an Buffer each time the flow is materialized. Is it wrong to materialize the ProducerFlow once per "user"-Actor?

Reproducer:

https://gist.github.com/qux42/16e0f0703b48083570277829a3a1dd0c

Heapdump:

dump.zip

I set akka.kafka.producer.parallelism to 3333 so it's easier to find the created buffers. "SELECT s FROM akka.stream.impl.FixedSizeBuffer$ModuloFixedSizeBuffer s WHERE (s.capacity = 3333)" in OQL (eclipse-mat).

qux42 avatar Oct 07 '20 16:10 qux42

Thank you for this report and the reproducer.

The Akka Streams internal FixedSizeBuffer.ModuloFixedSizeBuffer is used by the mapAsync operator to ensure the order of outgoing elements.

This buffer is not connected to the shared Kafka producer instance but created for Alpakka Kafka producer flows or sinks based on the flexiFlow.

If your system requires to create extremely many producer flows/sink the best option is to lower the parallelism.

It is possible to implement producer sinks without mapAsync. Eg. the committableSink does not use an internal buffer.

ennru avatar Oct 08 '20 14:10 ennru

Hello @ennru :)

Thank you for your Answer.

I don't understand what the mapAsync(settings.parallelism)(identity) archives. As I see it a Stream of Futures is mapped to it self in parallel. But because everything is already a Future, their computation is already running in parallel. Then the Elements are emitted downstream the same order they came from upstream. So for me, the mapAsync makes no sense at all, except getting rid of the futures for downstream. But then it could be called with parallelism 1 too. Or is there any other reason for the mapAsync that I don't see?

Maybe you can give me a hint how to use it better.

I have 1 Actor per user (around 20k) and want to easy produce messages from each of them. Is it save to create only one ActorRef and pass it to all of them? Or do I have then an bottleneck because everything is synchronized on one thread?

For producing fire and forget is enough for me. I don't mind if a message is missing or sent twice.
And because it doesn't use mapAsync it would send the message one after the other or? So the commitableSink doesn't make sense for me and using reusing one ActorRef would bring more in my case or do I see something wrong?

Or is there an better way without creating the ActorRef at all? Then I'd save at least the additional queue of the Actor. I'm not really fit in Akka-Streams, but I see otherways only possibilities with a more or less fixed source, where you need to know all the elements beforehand.

If there are best practices it would be nice to have more examples in the doku. If you think so too and you can also give me some hints, I could extend the docu :)

Thank you.

qux42 avatar Oct 08 '20 21:10 qux42

I don't understand what the mapAsync(settings.parallelism)(identity) achieves.

It makes the stream await the completion of the futures from the producer and pass the produced result downstream. The parallelism limits how many outstanding send calls are allowed before backpressure is applied.

You might not need to create a stream at all for your use-case. SendProducer lets you handle the futures yourself: https://doc.akka.io/docs/alpakka-kafka/current/send-producer.html

So the commitableSink doesn't make sense for me

No, it doesn't. I just noted that it would be possible to implement a specific Producer Sink Stage which wouldn't use mapAsync. As the current implementation composes the flow with a Sink.ignore the mapAsync is used even if noone is interested in the futures' result other than that they completed.

ennru avatar Oct 23 '20 11:10 ennru

hm.. it looks we bumped into the same problem when upgraded 2.0.3 to 2.0.6.

previously (in v.2.0.3) our app that produces messages to kafka consumed ~500 MB of heap mem, now (2.0.6) even 2GB is not enough.

Is there anything else we can try besides setting for akka.kafka.producer.parallelism changed from 10000 to a lower value?

gchudnov avatar Jan 26 '21 15:01 gchudnov

Hi, I experienced the same issue with 2.0.3 and 2.0.7. We have a service to consume from kafka and produce another message in some condition. The memory/CPU usage will keep increasing when the application handling high load and end up with the k8s pod get restarted. But it can't be solved even we change akka.kafka.producer.parallelism to 1. Currently, I'm using SendProducer, and it's working fine. But I'd like to know if this issue can be resolved.

xoyo24 avatar Mar 11 '21 08:03 xoyo24

Hello, we bumped into the same problem when upgraded from 2.0.3 to 2.0.6.

AhmedSoliman7 avatar Apr 20 '21 09:04 AhmedSoliman7

I did some profiling of the Alpakka Kafka Producer benchmark test. Heap memory usage was consistent, but the process only ran for about 5 minutes.

benchmarks/it:testOnly *.AlpakkaKafkaPlainProducer -- -z "bench with normal messages written to 8 partitions"

I increased the multiplier in the Benchmarks file to 10000 to generate a total of 20 million messages of approximately 5 bytes each to 8 partitions.

https://github.com/akka/alpakka-kafka/blob/master/benchmarks/src/it/scala/akka/kafka/benchmarks/Benchmarks.scala#L19

I tested the latest in master and the v2.0.7 release tag with the same results.

If anyone can create a reproducer I can dig into this further.

seglo avatar Apr 27 '21 21:04 seglo

Sorry, I just noticed that OP did provide a reproducer. I'll take another look with that impl.

seglo avatar Apr 28 '21 14:04 seglo