reactor-core icon indicating copy to clipboard operation
reactor-core copied to clipboard

Too difficult to control how much Reactor buffers internally

Open RonBarkan opened this issue 1 year ago • 2 comments

Given the following toy example, and especially when converting it to real code, it is very hard to control how much Reactor buffers. In practice, it would gladly buffer many 100K of elements, when only 800 will be sufficient for an efficient flow. Looks like the only way to control this is through limitRate(). However, using it is a whack-a-mole solution, which does not translate at all from the toy example to real code. By "buffer" I mean the count of elements that went into a pipeline and their processing is still pending, not to be confused with purposefully buffering, such as .buffer() or .cache() etc.

Note I have posted this stackoverflow question, but there were no takers.

Expected Behavior

  1. Reactor make good use of memory for optimal buffering before/at operators and does not store too many elements in the pipeline.

  2. The developer should be getting close to optimal behavior out of the box or should be able to easily control the behavior to be optimal, without whack-a-mole or investing too much time to research it.

Actual Behavior

  1. Reactor gladly buffers many 100Ks of elements, where it could easily used only 800 could have been buffered instead, Not only that, if too much is buffered, this can have additional implication, such as drain time if takeWhile() is used, or the number of elements that will require reprocessing when a task is rerun, because the previous iteration was too greedy.

  2. I'd claim that limitRate() is not sufficient and that tuning the pipeline to do the right thing is too difficult and, even if I invested the time to get it right, it is fragile and can easily be broken when changing the pipeline. Worse, even the toy example below behaves differently than my real code, since the limitRate() solution did not copy the behavior, leaving me puzzled at what's going on.

Steps to Reproduce

Important notes:

  1. limitRate(2) looks like it is ignored, but gets buffering of 7000 to 9000 items on my machine with this test (which is close to reasonable BUT, see next bullet).
  2. Unfortunately, the use of limitRate() as per below does not copy the behavior to my real code, which still happily cache 100Ks of elements, even though both pipelines are very similar.

What happens with this is that generate() is called too eagerly while step C processes items slowly. I am guessing that the watermark level for generate is 7.5 out of 10, but the level drops too quickly below the watermark.

  @Test
  void testTooMuchBuffering() {
    var counter = new AtomicInteger(0);
    var inA = new AtomicLong(0);
    var reqA = new AtomicLong(0);
    var inB = new AtomicLong(0);
    var reqB = new AtomicLong(0);
    var inB1 = new AtomicLong(0);
    var reqB1 = new AtomicLong(0);
    var inB2 = new AtomicLong(0);
    var reqB2 = new AtomicLong(0);
    var inB3 = new AtomicLong(0);
    var reqB3 = new AtomicLong(0);
    var inC = new AtomicLong(0);
    var reqC = new AtomicLong(0);
    var passed = new AtomicLong(0);

    Runnable snapshot = () -> {
      var a = inA.get() - inB.get();
      var b = inB.get() - inB1.get() / 1000;
      var b1 = inB1.get() - inB2.get() * 20;
      var b2 = inB2.get() - inB3.get();
      var b3 = inB3.get() - inC.get() / 20;
     // Requested/buffered (real elements buffered)
      System.out.printf(
          "A: %d/%d (%d) " +
              "B: %d/%d (%d) " +
              "B1: %d/%d " +
              "B2: %d/%d (%d) " +
              "B3: %d/%d (%d) " +
              "C: %d/%d done:%d buffered:%d\n",
      reqA.get(), a, a * 1000,
      reqB.get(), b, b * 1000,
      reqB1.get(), b1,
      reqB2.get(), b2, b2 * 20,
      reqB3.get(), b3, b3 * 20,
      reqC.get(), inC.get() - passed.get(),
      passed.get(),
      counter.get() - passed.get());
    };

    Flux.<List<Integer>>generate(
            sink -> {
              int val = counter.getAndAdd(1000);
              if (val >= 200_000) {
                counter.addAndGet(-1000);
                sink.complete();
                return;
              }
             // Reading from a "database"
              System.out.println("\nGenerating " + (val + 1000));
              snapshot.run();
              sink.next(IntStream.range(val, val + 1000).boxed().toList());
            })
        .doOnRequest(r -> reqA.addAndGet(r))
//        .limitRate(2)
        .doOnNext(i -> inA.incrementAndGet())
        .flatMap(l -> Flux.just(l), 1)  // A

        .doOnRequest(r -> reqB.addAndGet(r))
        .limitRate(2)
        .doOnNext(i -> inB.incrementAndGet())
        .flatMapIterable(Function.identity()) // B

        .doOnRequest(r -> reqB1.addAndGet(r))
        .limitRate(800)
        .doOnNext(i -> inB1.incrementAndGet())
        .buffer(20) // B1
        .doOnRequest(r -> reqB2.addAndGet(r))
        .doOnNext(i -> inB2.incrementAndGet())
        .takeWhile(l -> true) // B2
        .doOnRequest(r -> reqB3.addAndGet(r))
        .doOnNext(i -> inB3.incrementAndGet())
        .flatMapIterable(Function.identity()) // B3

        .doOnRequest(r -> {
          reqC.addAndGet(r);
          System.out.println("C Requesting: " + r);
        })
        .limitRate(800)
        .doOnNext(i -> inC.incrementAndGet())
        .flatMapSequential(i -> Flux.defer(() -> Flux.just(i).delayElements(Duration.ofMillis(30))), 4) // C
        .doOnNext(i -> {
          long val = passed.incrementAndGet();
          if ((val % 250) == 0) {
            snapshot.run();
          }
        })
        .count()
        .block();
  }

Things to try:

  1. Remove all limitRate() calls
  2. Buffer no more than 800 elements

Your Environment

  • Reactor version(s) used: 3.5.11
  • Other relevant libraries versions (eg. netty, ...):
  • JVM version (java -version): JDK 21
  • OS and version (eg uname -a): WSL2 / Ubuntu 5.15.146.1-microsoft-standard-WSL2

RonBarkan avatar Mar 16 '24 01:03 RonBarkan

If this is indeed a defect, it is quite severe. It could mean that Reactor could not be used in some cases where memory consumption is important.

RonBarkan avatar Apr 03 '24 20:04 RonBarkan

Hey, thanks for bringing this up. I didn't find the time to investigate deeply and can't make any prediction as to when I can do so, but I remembered potentially related discussions that might clarify things around this subject:

  • https://github.com/reactor/reactor-core/issues/1397#issuecomment-781226693
  • https://github.com/reactor/reactor-core/issues/3413

There might be more related concerns. Please have a look and let me know if this is along the lines of what you are concerned about.

chemicL avatar Apr 11 '24 12:04 chemicL

Hey @RonBarkan.

I investigated your example and decided to annotate it a bit differently so it's more readable what's going on. Please run it and review my comments and reporting.

public static void main(String[] args) throws InterruptedException {
    int limit = 200_000;
    int batch = 1000;
    int delayMillis = 1;
    int reportMillis = 100;
    AtomicInteger nextBatchStart = new AtomicInteger(0);
    AtomicInteger delivered = new AtomicInteger(0);
    AtomicInteger maxInflight = new AtomicInteger(0);
    CountDownLatch done = new CountDownLatch(1);

    Thread bgReporter = new Thread(() -> {
        try {
            do {
                int emitted = nextBatchStart.get();
                int deliveredSoFar = delivered.get();

                int inflight = emitted - deliveredSoFar;
                System.out.println("Inflight: " + inflight);
                if (maxInflight.get() < inflight) {
                    maxInflight.set(inflight);
                    System.out.println("** MAX INFLIGHT: " + inflight);
                }
            } while (!done.await(reportMillis, TimeUnit.MILLISECONDS));
        } catch (InterruptedException e) {
            System.out.println("Reporter interrupted.");
        }
    });

    bgReporter.start();

    Flux.<List<Integer>>generate(
                sink -> {
                    int emitStart = nextBatchStart.getAndAdd(batch);
                    System.out.println("\nGenerating " + emitStart + " to " + (emitStart + batch - 1));
                    sink.next(IntStream.range(emitStart, emitStart + batch).boxed().collect(Collectors.toList()));
                    if ((emitStart + batch ) >= limit) {
                        sink.complete();
                    }
                })
        .doOnRequest(r -> System.out.println("flatMap -> request(generate): " + r))

        // requests 1 at a time (concurrency), prefetch has no significance as it's
        // just a Flux of 1 List<Integer>
        .flatMap(l -> Flux.just(l), 1) // Flux<List<Integer>
        .doOnRequest(r -> System.out.println("limitRate(A) -> request(flatMap): " + r))

        // ? flatMap does not fuse, it receives request of 2 but makes an upstream
        // request of 1 each time, so limitRate is redundant.
        .limitRate(2) // limitRate(A): Flux<List<Integer>>
        .doOnRequest(r -> System.out.println("flatMapIterable(A) -> request(limitRate(A)): " + r))

        // prefetch of 256
        .flatMapIterable(Function.identity()) // flatMapIterable(A): Flux<List<Integer>> -> Flux<Integer>
        .doOnRequest(r -> System.out.println("limitRate(B) -> request(flatMapIterable(A)): " + r))

        // request 800 from upstream
        .limitRate(800) // limitRate(B): Flux<Integer>
        .doOnRequest(r -> System.out.println("buffer -> request(limitRate(B)): " + r))

        // there is no prefetching done by buffer:
        // every request of N maps to N * 20 upstream demand
        .buffer(20) // Flux<List<Integer>>

        // no prefetching, every request of N maps to N upstream demand
        .takeWhile(l -> true) // Flux<List<Integer>>
        .doOnRequest(r -> System.out.println("flatMapIterable(B) -> request(takeWhile): " + r))

        // prefetch of 256
        .flatMapIterable(Function.identity()) // flatMapIterable(B): Flux<Integer>
        .doOnRequest(r -> System.out.println("limitRate(C) -> request(flatMapIterable(B)): " + r))

        // prefetch 800 items from the above
        .limitRate(800) // limitRate(C) Flux<Integer>
        // produces quite a lot of logs - uncomment when needed
        // .doOnRequest(r -> System.out.println("flatMapSequential -> request(limitRate(C)): " + r))

        // initially requests 4 items and then requests 1 item a time to refill
        // up to 4 concurrent in a given time unit. Prefetches 32 items from inner
        // (which is always a Flux of 1 Integer)
        .flatMapSequential(i -> Flux.defer(() -> Flux.just(i).delayElements(Duration.ofMillis(delayMillis))), 4) // Flux<Integer>
        .doOnRequest(r -> System.out.println("count -> request(flatMapSequential): " + r))

        // no prefetching
        .doOnNext(i -> delivered.incrementAndGet())

        // request unlimited
        .count()
        .block();

    done.countDown();
    bgReporter.join();
}

I do think the limiting factor here is the combination of buffer(20) with flatMapIterable(). It combines to a prefetch of 256 buffers of 20 items. That means 5120 immediate inflight items. Due to the nature of prefetching that starts replenishing at 0.75 of prefetch consumption it means there can be 5120 + 3840 = 8960 inflight items just caused by these two operators. Add to that the Queues necessary for the other operators in the chain and you can start seeing over 10_000 inflight items.

You can also play with the rate of consumption (delayMillis) and see exactly how many items are requested at each stage.

I don't consider this a defect but a limitation of the design decision that was taken years ago. #1397 is a tracking issue dedicated to allow explicit control around the prefetching strategy. The current behaviour comes from the fact that prefetching makes systems more fluent. It can be limiting at times, however the knobs are there as you have seen with the choice you made around the use of flatMap's concurrency of 1. Regarding the redesign of reactor-core to support the option to disable eager prefetching, I believe it would require a major version to do so and we have no immediate plans around this subject.

For your particular case, consider an alternative combination of operators:

public static void main(String[] args) throws InterruptedException {
    int limit = 200_000;
    int batch = 1000;
    int delayMillis = 1;
    int reportMillis = 100;
    AtomicInteger nextBatchStart = new AtomicInteger(0);
    AtomicInteger delivered = new AtomicInteger(0);
    AtomicInteger maxInflight = new AtomicInteger(0);
    CountDownLatch done = new CountDownLatch(1);

    Thread bgReporter = new Thread(() -> {
        try {
            do {
                int emitted = nextBatchStart.get();
                int deliveredSoFar = delivered.get();

                int inflight = emitted - deliveredSoFar;
                System.out.println("Inflight: " + inflight);
                if (maxInflight.get() < inflight) {
                    maxInflight.set(inflight);
                    System.out.println("** MAX INFLIGHT: " + inflight);
                }
            } while (!done.await(reportMillis, TimeUnit.MILLISECONDS));
        } catch (InterruptedException e) {
            System.out.println("Reporter interrupted.");
        }
    });

    bgReporter.start();

    Flux.<List<Integer>>generate(
                sink -> {
                    int emitStart = nextBatchStart.getAndAdd(batch);
                    System.out.println("\nGenerating " + emitStart + " to " + (emitStart + batch - 1));
                    sink.next(IntStream.range(emitStart, emitStart + batch).boxed().collect(Collectors.toList()));
                    if ((emitStart + batch ) >= limit) {
                        sink.complete();
                    }
                })
        .doOnRequest(r -> System.out.println("flatMapIterable(A) -> request(generate): " + r))

        // prefetch 1 list at a time
        .flatMapIterable(Function.identity(), 1) // flatMapIterable(A): Flux<Integer>
        .doOnRequest(r -> System.out.println("buffer -> request(flatMapIterable(A)): " + r))

        // there is no prefetching done by buffer:
        // every request of N maps to N * 20 upstream demand
        .buffer(20) // Flux<List<Integer>>

        // prefetch 1
        .flatMapIterable(Function.identity(), 1) // flatMapIterable(B): Flux<Integer>
        // .doOnRequest(r -> System.out.println("flatMapSequential -> request(flatMapIterable(B)): " + r))

        // initially requests 4 items and then requests 1 item a time to refill
        // up to 4 concurrent in a given time unit. Prefetches 32 items from inner
        // (which is always a Flux of 1 Integer)
        .flatMapSequential(i -> Flux.defer(() -> Flux.just(i).delayElements(Duration.ofMillis(delayMillis))), 4) // Flux<Integer>
        .doOnRequest(r -> System.out.println("count -> request(flatMapSequential): " + r))

        // no prefetching
        .doOnNext(i -> delivered.incrementAndGet())

        // request unlimited
        .count()
        .block();

    done.countDown();
    bgReporter.join();
}

The maximum inflight is capped at 1024.

However, also note the total runtime for this case to finish. As you highlighted, your processing time is in general slow and you want to avoid excessive buffering. Probably for most use cases in which reactor-core is used (request-response processing, streaming) it might be that the fluency and runtime efficiency is more desired at the cost of RAM usage. If you take away the delay it might be quicker in certain scenarios to keep a larger buffer and have less demand signalling ceremony.

Hope this helps and you can improve your pipelines with the above explanations.

chemicL avatar Aug 16 '24 13:08 chemicL