reactor-core icon indicating copy to clipboard operation
reactor-core copied to clipboard

Apparent memory leak in pipeline combining flatMaps ParallelFlux and map to same object type

Open pmaria opened this issue 2 years ago • 16 comments

Expected Behavior

I'm running a pipeline that potentially processes a large amount of data from different types of sources to a single type of object. The pipeline makes use of several flatMaps then runs a computationally heavy part in parallel using ParallelFlux. In the end the resulting items in the Flux will be either written to some OutputStream or processed further using doOnNext or map.

Actual Behavior

The pipeline works correctly. However, when running, eventually the items in the Flux stop being garbage collected in young gen and end up in old gen. This sometimes happens right after starting the processing, but sometimes as much as a minute after starting the processing. Eventually the heap space is consumed fully due to old gen filling up.

I've played around with setting the parallelism and prefetch amount on the parallel method. When set to a low amount the effect is slowed, but eventually it leads to the same result in my pipeline (it seems to help better in the simplified reproduction below).

Interestingly, if the items in flux are mapped to another type of object after calling sequential(), there seems to be no leak.

Steps to Reproduce

I've created a simplified version of my pipeline with which I can reproduce the behavior. A github repo is available here.

Using a map or after sequential() that produces objects of the same type that are already in the Flux. Interestingly, if we map to a different type of object the leak does not seem to take place.

import java.util.stream.Stream;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;

class MemLeakTest {

  @Test
  void memLeakTest() {
    Flux.just("a", "b", "c", "d", "e", "f", "g")
        .flatMap(id -> generateTuplesFor(id, (Integer.MAX_VALUE / 7)))
        .parallel(6)
        .runOn(Schedulers.parallel())
        .flatMap(this::generatePairsFor)
        .map(this::mapPair)
        .sequential()
        .map(this::mapPair) // If you remove this call, it seems to run without leaking.
//        .map(this::toTriple) // Commenting out the above line and just mapping to a different object doesn't seem to leak.
        .blockLast();
  }

  private Flux<Tuple2<String, String>> generateTuplesFor(String id, int amount) {
    return Flux.fromStream(Stream.iterate(0, i -> i + 1)
        .limit(amount)
        .map(i -> generateTupleFor(id, i)));
  }

  private Tuple2<String, String> generateTupleFor(String id, int number) {
    return Tuples.of(String.format("left-%s-%s", id, number), String.format("right-%s-%s", id, number));
  }

  private Flux<Pair> generatePairsFor(Tuple2<String, String> tuple2) {
    return Flux.just(new Pair(tuple2.getT1(), tuple2.getT2()));
  }

  private Pair mapPair(Pair pair) {
    var left = transformValue(pair.getLeft());
    var right = transformValue(pair.getRight());
    return new Pair(left, right);
  }

  private Triple toTriple(Pair pair) {
    return new Triple(pair.left, pair.getLeft() + pair.getRight(), pair.getRight());
  }

  private String transformValue(String value) {
    return String.format("transformed-%s", value);
  }

  static class Pair {
    private final String left;
    private final String right;

    Pair(String left, String right) {
      this.left = left;
      this.right = right;
    }

    String getLeft() {
      return left;
    }

    String getRight() {
      return right;
    }
  }

  static class Triple {
    private final String left;

    private final String middle;

    private final String right;

    Triple(String left, String middle, String right) {
      this.left = left;
      this.middle = middle;
      this.right = right;
    }

    String getLeft() {
      return left;
    }

    String getMiddle() {
      return middle;
    }

    String getRight() {
      return right;
    }
  }
}

Your Environment

  • Reactor version(s) used: reactor-bom 2020.0.22
  • Other relevant libraries versions (eg. netty, ...): none
  • JVM version (java -version): OpenJDK 64-Bit Server VM (11.0.16.1+1, mixed mode)
  • OS and version (eg uname -a): Microsoft Windows 10 Pro 10.0.19044 N/A Build 19044

pmaria avatar Aug 31 '22 14:08 pmaria

@pmaria did you check the memory damp? Are any specific objects leaked to be referenced here?

OlegDokuka avatar Sep 05 '22 12:09 OlegDokuka

@OlegDokuka yes, it is the Pair object that leaks. I can try to post more detail tomorrow, if necessary.

pmaria avatar Sep 05 '22 17:09 pmaria

@OlegDokuka I made a heapdump when the leak starts happening. Hopefully this helps.

pmaria avatar Sep 07 '22 13:09 pmaria

@OlegDokuka let me know if any more input is required

pmaria avatar Sep 13 '22 06:09 pmaria

Any update on this? Or ideas for a possible workaround?

pmaria avatar Oct 25 '22 13:10 pmaria

Any update on this? Or ideas for a possible workaround?

Hello, @pimlock! The best option to go for now would be the usage of Tuple2 instead of Pair.

We will get back to this issue once we are done with 3.5 line release

OlegDokuka avatar Oct 26 '22 07:10 OlegDokuka

@OlegDokuka thanks for the response.

The best option to go for now would be the usage of Tuple2 instead of Pair.

I used Pair simply as a reproduction example. My real use case uses different objects. Could you elaborate on why the usage of Tuple2 would be a better option? This might help me find a suitable solution on my side.

We will get back to this issue once we are done with 3.5 line release

OK, good to know.

pmaria avatar Oct 26 '22 08:10 pmaria

@OlegDokuka thanks for the response.

The best option to go for now would be the usage of Tuple2 instead of Pair.

I used Pair simply as a reproduction example. My real use case uses different objects. Could you elaborate on why the usage of Tuple2 would be a better option? This might help me find a suitable solution on my side.

We will get back to this issue once we are done with 3.5 line release

OK, good to know.

In your original post you mentioned that mapping to a different object does not reproduce the leak, thus I suggested to map to tuple (not sure what is your real scenario).

Also, if the scenario you are demonstrating is close enough to the real one, I could suggest replacing flatmap with concatMap, or even if your flatten source is iterable - try to use flatMapIterable. That will not break the behaviour but should significantly improve performance

OlegDokuka avatar Oct 26 '22 08:10 OlegDokuka

@OlegDokuka

In your original post you mentioned that mapping to a different object does not reproduce the leak, thus I suggested to map to tuple (not sure what is your real scenario).

Ok, I'll try to better describe my actual scenario (still simplified, but hopefully giving a better idea):

I have, potentially, multiple sources. Per source I have, potentially, multiple source publishers. (For example several queries on the same source).

The source publishers can be created in different ways, currently Flux.fromIterable or Flux.create (with back pressure support). Let's say these produce a Flux<Record>. The amount of records published is in principle unbounded, and publishers will probably be faster than the processing, hence back pressure support is necessary.

On the processing side I have a set of processors that inspect a record and potentially create a new Flux<ResultObject> from the record. There could be multiple processors that need to do something with a single record and create results.

So we have something like this:

Flux<Source> sources;
Set<Processor> proccesors;

Flux<Result> results = sources.flatMap(source -> {
    return Flux.fromIterable(getSourcePublishers(source)) // produces Flux<SourcePublisher>
      .flatMap(SourcePublisher::publish) // produces unbounded Flux<Record>
      .flatMap(record -> {
         Flux.fromIterable(processors)
           .flatMap(Processor::process) // produces Flux<Result>
      });
  });

// further processing or results

Now, this works fine and without leaking without the use of ParallelFlux, but introducing ParallelFlux greatly improves processing speed, which is what we're after.

So something like:

Flux<Result> results = sources.flatMap(source -> {
    return Flux.fromIterable(getSourcePublishers(source))  // produces Flux<SourcePublisher>
      .flatMap(SourcePublisher::publish) // produces unbounded Flux<Record>
      .flatMap(record -> { 
         Flux.fromIterable(processors)
           .parallel(6)
           .runOn(Schedulers.parallel())
           .flatMap(Processor::process)
           .sequential()  // produces Flux<Result>
      });
  });

In tests I've done using some degree of parallization reduces processing time by a factor 2 or 3.

However, introducing ParallelFlux seems to cause the memory leak described in this issue with a simplified reproduction case.

Also, if the scenario you are demonstrating is close enough to the real one, I could suggest replacing flatmap with concatMap, or even if your flatten source is iterable - try to use flatMapIterable. That will not break the behaviour but should significantly improve performance

Order preservation is not necessary for my use case. So, i did not see a use for concatMap. Would you still advise usage of concatMap in this case, and if so, where?

pmaria avatar Oct 26 '22 09:10 pmaria

@pmaria I think you overcomplicate things with parallel flux. Also, the mentioned use of parallelFlux is inappropriate since you get no benefits in terms of performance (I'm more than sure you get even performance degradation with it even without the observed leak).

Try to keep the impl simple as the following:

Flux<Result> results = sources.flatMap(source -> {
    return Flux.fromIterable(getSourcePublishers(source))  // produces Flux<SourcePublisher>
      .flatMap(SourcePublisher::publish) // produces unbounded Flux<Record>
      .flatMap(record -> Flux.merge(
         processors.stream()
           .map(Processor::process)
           .collect(Collectors.toList())
      ));
  });

You don't need any additional parallelization especially if Processor::process is the async, non-blocking task

OlegDokuka avatar Oct 26 '22 13:10 OlegDokuka

@OlegDokuka

Ugh I am so sorry, I made a mistake in my pseudo code about the location of applying the ParallelFlux 🤦🏼

It should have been this:

Flux<Result> results = sources.flatMap(source -> {
    return Flux.fromIterable(getSourcePublishers(source))  // produces Flux<SourcePublisher>
      .flatMap(SourcePublisher::publish) // produces unbounded Flux<Record>
      .parallel(6)
      .runOn(Schedulers.parallel())
      .flatMap(record -> { 
         Flux.fromIterable(processors)
           .flatMap(Processor::process)  // produces Flux<Result>
      })
      .sequential();
  });

So the processing (=the hard work) is done on multiple cores, and this significantly speeds up my pipeline by more than factor 2.

You don't need any additional parallelization especially if Processor::process is the async, non-blocking task

FYI The Processor::process is non-blocking, but synch.

pmaria avatar Oct 26 '22 15:10 pmaria

@pmaria it changes nothing. The .parallel() is overhead. It should be used only when there is CPU-bounded work.

Your case is the opposite your code seems to do non-blocking stuff. Your code should be

Flux<Result> results = sources.flatMap(source -> {
    return Flux.fromIterable(getSourcePublishers(source))  // produces Flux<SourcePublisher>
      .flatMap(SourcePublisher::publish) // produces unbounded Flux<Record>
      .flatMap(record -> 
         Flux.fromIterable(processors)
             .flatMap(Processor::process)  // produces Flux<Result>
      )
  });

or if you still wanna run stuff inside flatMap on the Schedulers.parallel() you may enhance the previous example as follows:

Flux<Result> results = sources.flatMap(source -> {
    return Flux.fromIterable(getSourcePublishers(source))  // produces Flux<SourcePublisher>
      .flatMap(SourcePublisher::publish) // produces unbounded Flux<Record>
      .flatMap(record -> 
         Flux.defer(() -> 
                     Flux.fromIterable(processors)
                        .flatMap(Processor::process)  // produces Flux<Result>
             )
             .subscribeOn(Scheduler.parallel())
      )
  });

OlegDokuka avatar Oct 26 '22 15:10 OlegDokuka

@OlegDokuka thanks for engaging.

I did a quick run for a test case with your proposed approach using Flux.defer and Schedulers.parallel() and and it does increase performance slighty +/- factor 1.5, but interestingly not as much as using ParallelFlux. The work done in Processor::process is CPU bound, in the sense that it does some transformations on the Records.

But I get the sense that you would still advise against using ParellFlux for this use case.

For now I can make due with your proposed approach. I will do some further testing to see how it works for other use cases.

I'd be interested to see the difference for longer running processes if this issue is fixed.

pmaria avatar Oct 26 '22 15:10 pmaria

@pmaria we keep this issue open so it will be investigated once there is more availability! However, I doubt there will any success.

The challenging part here is that it can be correct that memory grows:

  1. .parallel(6) in combination with runOn allocates 6 queues. (parameter 6 sets specify the number of independent downstreams. runOn under the hood is just normal publishOn so it allocates a queue by default.)
  2. every flatMap has a queue + every inner Publisher flatten in flatMapo has a subscriber with a small queue of 32 elements in size. By default main queue has a size of 256 and a concurrency of 256. It means 256 inner queues of max 32 elements will be potentially allocated.
  3. parallel chain has .flatMap downstream. We need to add an extra 6 * 256 queues
  4. Then we have .sequentially which has a queue under the hood as well. So an extra queue

In total, around ~(6 + 6 + 6 * 256 + 1) * 256 queues (the outer multiplier 256 here is because everything is within the outer flatMap) where every element can be stored for a long enough period of time so it moves from Eden to survival and then the old gen.

Thus the memory leak you see can be just values stored in those queues and that could be expected. The performance gain you see can be just a wrong first impression (illusion during the first few mins of run) since over time GC will make your app as slow as possible...

OlegDokuka avatar Oct 26 '22 16:10 OlegDokuka

following. Very interesting

datbui avatar Nov 18 '22 17:11 datbui

If .flatMap(SourcePublisher::publish) returns a list instead of a single object and considers blocking, what is the best way to collect all the objects and avoid sequential?

Here is the test code..

List<Integer> ll = Arrays.asList(10,3);

Mono<Object> t = Flux.fromIterable(ll)
                .flatMap(l -> Mono.defer(() -> {
                    try {
                        return Mono.just(getTable(l));
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }).subscribeOn(Schedulers.boundedElastic()).log())
                .flatMap(l1 -> Flux.defer(() -> Flux.fromIterable(l1))).collectList().map(ele -> getSingleList(ele)).log();

amit306 avatar Sep 02 '23 16:09 amit306

With @OlegDokuka's comment from 2022 and taking current date's into account it seems we never got to investigate further. I believe the answer clarifies a lot though. With that, I'm closing the issue.

chemicL avatar Mar 18 '24 12:03 chemicL