reactor-core
reactor-core copied to clipboard
Apparent memory leak in pipeline combining flatMaps ParallelFlux and map to same object type
Expected Behavior
I'm running a pipeline that potentially processes a large amount of data from different types of sources to a single type of object. The pipeline makes use of several flatMap
s then runs a computationally heavy part in parallel using ParallelFlux
. In the end the resulting items in the Flux
will be either written to some OutputStream
or processed further using doOnNext
or map
.
Actual Behavior
The pipeline works correctly. However, when running, eventually the items in the Flux
stop being garbage collected in young gen and end up in old gen. This sometimes happens right after starting the processing, but sometimes as much as a minute after starting the processing. Eventually the heap space is consumed fully due to old gen filling up.
I've played around with setting the parallelism
and prefetch
amount on the parallel
method. When set to a low amount the effect is slowed, but eventually it leads to the same result in my pipeline (it seems to help better in the simplified reproduction below).
Interestingly, if the items in flux are mapped to another type of object after calling sequential()
, there seems to be no leak.
Steps to Reproduce
I've created a simplified version of my pipeline with which I can reproduce the behavior. A github repo is available here.
Using a map
or after sequential()
that produces objects of the same type that are already in the Flux. Interestingly, if we map to a different type of object the leak does not seem to take place.
import java.util.stream.Stream;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
class MemLeakTest {
@Test
void memLeakTest() {
Flux.just("a", "b", "c", "d", "e", "f", "g")
.flatMap(id -> generateTuplesFor(id, (Integer.MAX_VALUE / 7)))
.parallel(6)
.runOn(Schedulers.parallel())
.flatMap(this::generatePairsFor)
.map(this::mapPair)
.sequential()
.map(this::mapPair) // If you remove this call, it seems to run without leaking.
// .map(this::toTriple) // Commenting out the above line and just mapping to a different object doesn't seem to leak.
.blockLast();
}
private Flux<Tuple2<String, String>> generateTuplesFor(String id, int amount) {
return Flux.fromStream(Stream.iterate(0, i -> i + 1)
.limit(amount)
.map(i -> generateTupleFor(id, i)));
}
private Tuple2<String, String> generateTupleFor(String id, int number) {
return Tuples.of(String.format("left-%s-%s", id, number), String.format("right-%s-%s", id, number));
}
private Flux<Pair> generatePairsFor(Tuple2<String, String> tuple2) {
return Flux.just(new Pair(tuple2.getT1(), tuple2.getT2()));
}
private Pair mapPair(Pair pair) {
var left = transformValue(pair.getLeft());
var right = transformValue(pair.getRight());
return new Pair(left, right);
}
private Triple toTriple(Pair pair) {
return new Triple(pair.left, pair.getLeft() + pair.getRight(), pair.getRight());
}
private String transformValue(String value) {
return String.format("transformed-%s", value);
}
static class Pair {
private final String left;
private final String right;
Pair(String left, String right) {
this.left = left;
this.right = right;
}
String getLeft() {
return left;
}
String getRight() {
return right;
}
}
static class Triple {
private final String left;
private final String middle;
private final String right;
Triple(String left, String middle, String right) {
this.left = left;
this.middle = middle;
this.right = right;
}
String getLeft() {
return left;
}
String getMiddle() {
return middle;
}
String getRight() {
return right;
}
}
}
Your Environment
- Reactor version(s) used: reactor-bom 2020.0.22
- Other relevant libraries versions (eg.
netty
, ...): none - JVM version (
java -version
): OpenJDK 64-Bit Server VM (11.0.16.1+1, mixed mode) - OS and version (eg
uname -a
): Microsoft Windows 10 Pro 10.0.19044 N/A Build 19044
@pmaria did you check the memory damp? Are any specific objects leaked to be referenced here?
@OlegDokuka yes, it is the Pair
object that leaks. I can try to post more detail tomorrow, if necessary.
@OlegDokuka I made a heapdump when the leak starts happening. Hopefully this helps.
@OlegDokuka let me know if any more input is required
Any update on this? Or ideas for a possible workaround?
Any update on this? Or ideas for a possible workaround?
Hello, @pimlock!
The best option to go for now would be the usage of Tuple2
instead of Pair
.
We will get back to this issue once we are done with 3.5 line release
@OlegDokuka thanks for the response.
The best option to go for now would be the usage of
Tuple2
instead ofPair
.
I used Pair
simply as a reproduction example. My real use case uses different objects.
Could you elaborate on why the usage of Tuple2
would be a better option? This might help me find a suitable solution on my side.
We will get back to this issue once we are done with 3.5 line release
OK, good to know.
@OlegDokuka thanks for the response.
The best option to go for now would be the usage of
Tuple2
instead ofPair
.I used
Pair
simply as a reproduction example. My real use case uses different objects. Could you elaborate on why the usage ofTuple2
would be a better option? This might help me find a suitable solution on my side.We will get back to this issue once we are done with 3.5 line release
OK, good to know.
In your original post you mentioned that mapping to a different object does not reproduce the leak, thus I suggested to map to tuple (not sure what is your real scenario).
Also, if the scenario you are demonstrating is close enough to the real one, I could suggest replacing flatmap with concatMap, or even if your flatten source is iterable - try to use flatMapIterable. That will not break the behaviour but should significantly improve performance
@OlegDokuka
In your original post you mentioned that mapping to a different object does not reproduce the leak, thus I suggested to map to tuple (not sure what is your real scenario).
Ok, I'll try to better describe my actual scenario (still simplified, but hopefully giving a better idea):
I have, potentially, multiple sources. Per source I have, potentially, multiple source publishers. (For example several queries on the same source).
The source publishers can be created in different ways, currently Flux.fromIterable or Flux.create (with back pressure support). Let's say these produce a Flux<Record>
. The amount of records published is in principle unbounded, and publishers will probably be faster than the processing, hence back pressure support is necessary.
On the processing side I have a set of processors that inspect a record and potentially create a new Flux<ResultObject> from the record. There could be multiple processors that need to do something with a single record and create results.
So we have something like this:
Flux<Source> sources;
Set<Processor> proccesors;
Flux<Result> results = sources.flatMap(source -> {
return Flux.fromIterable(getSourcePublishers(source)) // produces Flux<SourcePublisher>
.flatMap(SourcePublisher::publish) // produces unbounded Flux<Record>
.flatMap(record -> {
Flux.fromIterable(processors)
.flatMap(Processor::process) // produces Flux<Result>
});
});
// further processing or results
Now, this works fine and without leaking without the use of ParallelFlux
, but introducing ParallelFlux
greatly improves processing speed, which is what we're after.
So something like:
Flux<Result> results = sources.flatMap(source -> {
return Flux.fromIterable(getSourcePublishers(source)) // produces Flux<SourcePublisher>
.flatMap(SourcePublisher::publish) // produces unbounded Flux<Record>
.flatMap(record -> {
Flux.fromIterable(processors)
.parallel(6)
.runOn(Schedulers.parallel())
.flatMap(Processor::process)
.sequential() // produces Flux<Result>
});
});
In tests I've done using some degree of parallization reduces processing time by a factor 2 or 3.
However, introducing ParallelFlux
seems to cause the memory leak described in this issue with a simplified reproduction case.
Also, if the scenario you are demonstrating is close enough to the real one, I could suggest replacing flatmap with concatMap, or even if your flatten source is iterable - try to use flatMapIterable. That will not break the behaviour but should significantly improve performance
Order preservation is not necessary for my use case. So, i did not see a use for concatMap
. Would you still advise usage of concatMap
in this case, and if so, where?
@pmaria I think you overcomplicate things with parallel flux. Also, the mentioned use of parallelFlux is inappropriate since you get no benefits in terms of performance (I'm more than sure you get even performance degradation with it even without the observed leak).
Try to keep the impl simple as the following:
Flux<Result> results = sources.flatMap(source -> {
return Flux.fromIterable(getSourcePublishers(source)) // produces Flux<SourcePublisher>
.flatMap(SourcePublisher::publish) // produces unbounded Flux<Record>
.flatMap(record -> Flux.merge(
processors.stream()
.map(Processor::process)
.collect(Collectors.toList())
));
});
You don't need any additional parallelization especially if Processor::process
is the async, non-blocking task
@OlegDokuka
Ugh I am so sorry, I made a mistake in my pseudo code about the location of applying the ParallelFlux
🤦🏼
It should have been this:
Flux<Result> results = sources.flatMap(source -> {
return Flux.fromIterable(getSourcePublishers(source)) // produces Flux<SourcePublisher>
.flatMap(SourcePublisher::publish) // produces unbounded Flux<Record>
.parallel(6)
.runOn(Schedulers.parallel())
.flatMap(record -> {
Flux.fromIterable(processors)
.flatMap(Processor::process) // produces Flux<Result>
})
.sequential();
});
So the processing (=the hard work) is done on multiple cores, and this significantly speeds up my pipeline by more than factor 2.
You don't need any additional parallelization especially if Processor::process is the async, non-blocking task
FYI The Processor::process
is non-blocking, but synch.
@pmaria it changes nothing. The .parallel()
is overhead. It should be used only when there is CPU-bounded work.
Your case is the opposite your code seems to do non-blocking stuff. Your code should be
Flux<Result> results = sources.flatMap(source -> {
return Flux.fromIterable(getSourcePublishers(source)) // produces Flux<SourcePublisher>
.flatMap(SourcePublisher::publish) // produces unbounded Flux<Record>
.flatMap(record ->
Flux.fromIterable(processors)
.flatMap(Processor::process) // produces Flux<Result>
)
});
or if you still wanna run stuff inside flatMap
on the Schedulers.parallel()
you may enhance the previous example as follows:
Flux<Result> results = sources.flatMap(source -> {
return Flux.fromIterable(getSourcePublishers(source)) // produces Flux<SourcePublisher>
.flatMap(SourcePublisher::publish) // produces unbounded Flux<Record>
.flatMap(record ->
Flux.defer(() ->
Flux.fromIterable(processors)
.flatMap(Processor::process) // produces Flux<Result>
)
.subscribeOn(Scheduler.parallel())
)
});
@OlegDokuka thanks for engaging.
I did a quick run for a test case with your proposed approach using Flux.defer
and Schedulers.parallel()
and and it does increase performance slighty +/- factor 1.5, but interestingly not as much as using ParallelFlux
. The work done in Processor::process
is CPU bound, in the sense that it does some transformations on the Records
.
But I get the sense that you would still advise against using ParellFlux
for this use case.
For now I can make due with your proposed approach. I will do some further testing to see how it works for other use cases.
I'd be interested to see the difference for longer running processes if this issue is fixed.
@pmaria we keep this issue open so it will be investigated once there is more availability! However, I doubt there will any success.
The challenging part here is that it can be correct that memory grows:
-
.parallel(6)
in combination withrunOn
allocates 6 queues. (parameter 6 sets specify the number of independent downstreams.runOn
under the hood is just normalpublishOn
so it allocates a queue by default.) - every
flatMap
has a queue + every innerPublisher
flatten inflatMapo
has a subscriber with a small queue of 32 elements in size. By default main queue has a size of 256 and a concurrency of 256. It means 256 inner queues of max 32 elements will be potentially allocated. - parallel chain has
.flatMap
downstream. We need to add an extra 6 * 256 queues - Then we have
.sequentially
which has a queue under the hood as well. So an extra queue
In total, around ~(6 + 6 + 6 * 256 + 1) * 256
queues (the outer multiplier 256 here is because everything is within the outer flatMap
) where every element can be stored for a long enough period of time so it moves from Eden to survival and then the old gen.
Thus the memory leak you see can be just values stored in those queues and that could be expected. The performance gain you see can be just a wrong first impression (illusion during the first few mins of run) since over time GC will make your app as slow as possible...
following. Very interesting
If .flatMap(SourcePublisher::publish) returns a list instead of a single object and considers blocking, what is the best way to collect all the objects and avoid sequential?
Here is the test code..
List<Integer> ll = Arrays.asList(10,3);
Mono<Object> t = Flux.fromIterable(ll)
.flatMap(l -> Mono.defer(() -> {
try {
return Mono.just(getTable(l));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}).subscribeOn(Schedulers.boundedElastic()).log())
.flatMap(l1 -> Flux.defer(() -> Flux.fromIterable(l1))).collectList().map(ele -> getSingleList(ele)).log();
With @OlegDokuka's comment from 2022 and taking current date's into account it seems we never got to investigate further. I believe the answer clarifies a lot though. With that, I'm closing the issue.