alpakka-kafka
alpakka-kafka copied to clipboard
Potential Memory-Leak/Offender in Producer
Versions used
2.0.4, but it's happening with 2.0.5 too
Akka version: 2.6.8
Expected Behavior
Using an shared Producer, we expect that there is only one Buffer (akka.stream.impl.FixedSizeBuffer$ModuloFixedSizeBuffer) for outgoing Messages created.
Actual Behavior
We see one Buffer per Stream. As we create one Stream per user and the default setting for akka.kafka.producer.parallelism changed from 100 to 10000, we have suddenly a lot of huge nearly empty Buffers, that are using most of our memory. As a quick fix, we reduced the parallelism level back to 100. I still don't understand why we have so many buffers. If we search for the akka.kafka.ProducerSettings or org.apache.kafka.clients.producer.KafkaProducer we have 1 from each as expected.
Problem:
In akka.kafka.scaladsl.Producer#flexiFlow an mapAsync(settings.parallelism)(identity) creates an Buffer each time the flow is materialized. Is it wrong to materialize the ProducerFlow once per "user"-Actor?
Reproducer:
https://gist.github.com/qux42/16e0f0703b48083570277829a3a1dd0c
Heapdump:
I set akka.kafka.producer.parallelism to 3333 so it's easier to find the created buffers. "SELECT s FROM akka.stream.impl.FixedSizeBuffer$ModuloFixedSizeBuffer s WHERE (s.capacity = 3333)" in OQL (eclipse-mat).
Thank you for this report and the reproducer.
The Akka Streams internal FixedSizeBuffer.ModuloFixedSizeBuffer
is used by the mapAsync
operator to ensure the order of outgoing elements.
This buffer is not connected to the shared Kafka producer instance but created for Alpakka Kafka producer flows or sinks based on the flexiFlow
.
If your system requires to create extremely many producer flows/sink the best option is to lower the parallelism.
It is possible to implement producer sinks without mapAsync
. Eg. the committableSink
does not use an internal buffer.
Hello @ennru :)
Thank you for your Answer.
I don't understand what the mapAsync(settings.parallelism)(identity)
archives. As I see it a Stream of Futures is mapped to it self in parallel. But because everything is already a Future, their computation is already running in parallel. Then the Elements are emitted downstream the same order they came from upstream. So for me, the mapAsync makes no sense at all, except getting rid of the futures for downstream. But then it could be called with parallelism 1 too. Or is there any other reason for the mapAsync that I don't see?
Maybe you can give me a hint how to use it better.
I have 1 Actor per user (around 20k) and want to easy produce messages from each of them. Is it save to create only one ActorRef and pass it to all of them? Or do I have then an bottleneck because everything is synchronized on one thread?
For producing fire and forget is enough for me. I don't mind if a message is missing or sent twice.
And because it doesn't use mapAsync it would send the message one after the other or?
So the commitableSink doesn't make sense for me and using reusing one ActorRef would bring more in my case or do I see something wrong?
Or is there an better way without creating the ActorRef at all? Then I'd save at least the additional queue of the Actor. I'm not really fit in Akka-Streams, but I see otherways only possibilities with a more or less fixed source, where you need to know all the elements beforehand.
If there are best practices it would be nice to have more examples in the doku. If you think so too and you can also give me some hints, I could extend the docu :)
Thank you.
I don't understand what the
mapAsync(settings.parallelism)(identity)
achieves.
It makes the stream await the completion of the futures from the producer and pass the produced result downstream.
The parallelism limits how many outstanding send
calls are allowed before backpressure is applied.
You might not need to create a stream at all for your use-case. SendProducer
lets you handle the futures yourself: https://doc.akka.io/docs/alpakka-kafka/current/send-producer.html
So the commitableSink doesn't make sense for me
No, it doesn't. I just noted that it would be possible to implement a specific Producer Sink Stage which wouldn't use mapAsync
. As the current implementation composes the flow with a Sink.ignore
the mapAsync
is used even if noone is interested in the futures' result other than that they completed.
hm.. it looks we bumped into the same problem when upgraded 2.0.3 to 2.0.6.
previously (in v.2.0.3) our app that produces messages to kafka consumed ~500 MB of heap mem, now (2.0.6) even 2GB is not enough.
Is there anything else we can try besides setting for akka.kafka.producer.parallelism changed from 10000 to a lower value?
Hi, I experienced the same issue with 2.0.3 and 2.0.7.
We have a service to consume from kafka and produce another message in some condition.
The memory/CPU usage will keep increasing when the application handling high load and end up with the k8s pod get restarted.
But it can't be solved even we change akka.kafka.producer.parallelism
to 1
.
Currently, I'm using SendProducer
, and it's working fine.
But I'd like to know if this issue can be resolved.
Hello, we bumped into the same problem when upgraded from 2.0.3 to 2.0.6.
I did some profiling of the Alpakka Kafka Producer benchmark test. Heap memory usage was consistent, but the process only ran for about 5 minutes.
benchmarks/it:testOnly *.AlpakkaKafkaPlainProducer -- -z "bench with normal messages written to 8 partitions"
I increased the multiplier in the Benchmarks file to 10000
to generate a total of 20 million messages of approximately 5 bytes each to 8 partitions.
https://github.com/akka/alpakka-kafka/blob/master/benchmarks/src/it/scala/akka/kafka/benchmarks/Benchmarks.scala#L19
I tested the latest in master
and the v2.0.7
release tag with the same results.
If anyone can create a reproducer I can dig into this further.
Sorry, I just noticed that OP did provide a reproducer. I'll take another look with that impl.