reactor-core icon indicating copy to clipboard operation
reactor-core copied to clipboard

expandDeep uses unbounded queue which results in OutOfMemoryException

Open troydm opened this issue 2 years ago • 2 comments

expandDeep uses unbounded queue internally which results in OutOfMemoryException when subscriber is not fast enough to keep up with publisher

Expected Behavior

not sure

Actual Behavior

OutOfMemoryException thrown

Steps to Reproduce

@Test
void reproCase() {
    val genBytes = { Array(1024 * 1024 * 100) { 0 } }
    val i = AtomicInteger(0)

    Flux. just(genBytes()).expandDeep({
        if (i.incrementAndGet() > 100)
            Flux.empty()
        else
            Flux.just(genBytes())
    }, 1)
   .delayElements(Duration.ofMinutes(5))
   .subscribe {
     println("byte array processed") 
   } 

}

Possible Solution

add variety of expand and expandDeep methods with bounded queue in order to limit memory usage

Your Environment

  • Reactor version(s) used: 3.4.27
  • Other relevant libraries versions (eg. netty, ...):
  • JVM version (java -version): 11
  • OS and version (eg uname -a): Windows 10

troydm avatar Mar 22 '23 16:03 troydm

Supplier<List<Byte>> genBytes = () -> Arrays.asList(new Byte[1024*1024*100]);
Flux.just(genBytes.get()).subscribe(); // OOM

Hmm maybe expand's unbounded queue is not root cause of OOM?

Like above, Flux.just(genBytes()) also trigger OOM in my local env without expandDeep().

injae-kim avatar Feb 14 '24 11:02 injae-kim

@injae-kim strange, genBytes() generates 1gb byte array which shouldn't cause oom, how much memory do you have set as maximum, you can modify it to generate less memory, the point of this issue still lays in expandDeep which internally is using unbounded queue

troydm avatar Feb 17 '24 16:02 troydm

https://github.com/reactor/reactor-core/blob/b42d9ab1f8220db632944aee4e44ebb1d3b81662/reactor-core/src/main/java/reactor/core/publisher/FluxExpand.java#L107

https://github.com/reactor/reactor-core/blob/b42d9ab1f8220db632944aee4e44ebb1d3b81662/reactor-core/src/main/java/reactor/core/publisher/FluxExpand.java#L235

the point of this issue still lays in expandDeep which internally is using unbounded queue

oh I see. I checked that expand() and expandDeep() uses unbounded queue, deque internally and it's the root cause of OOM in this case.

My Idea

public final Flux<T> expandDeep(Function<? super T, ? extends Publisher<? extends T>> expander, int capacityHint,
    int maxCapacity,  BiConsumer<? super Queue, ? super T> onMaxCapacity) { // 👈👈
		return onAssembly(new FluxExpand<>(this, expander, false, capacityHint, maxCapacity, onMaxCapacity));
	}

// e.g.
Flux.just(..)
  .expandDeep(publisher -> .., ccapacityHint,
              maxCapacity: 1024,
              onMaxCapacity: (queue, elem) -> { /* just drop or replace last element, ..*/  } // 👈👈

If we use bounded queue/deque on expand/expandDeep, I think we can allow user to set maxCapacity and custom onMaxCapacity strategy. Then user can determine what should queue/deque do when it reaches maxCapacity. (e.g. just drop new element or replace last element, leave log ..)

or, we can just simply drop new element when queue/deque reaches maxCapacity and don't allow user to set onMaxCapacity strategy.

Hi @OlegDokuka , can you share your opinion please? 🙇 After checking maintainer's opinion, I'll create PR :) thank you!

injae-kim avatar Feb 19 '24 15:02 injae-kim

@injae-kim thanks for taking the time to analyze this issue 👏 @troydm can you please have a look and provide a potential use case for the proposed API?

I am to be honest a bit blurry about practical applications of this and wonder what sort of algorithms can be implemented on top of a bounded capacity variant that includes a strategy for dealing with overflows. If there's a practical use for it, I suppose we can base the actual design on the use cases instead of first designing and then considering what can be done with that :)

Thanks in advance for providing use cases that are currently impossible and what could be the usage of a bounded capacity-based implementation, regardless of the final design -> we can see what the final requirements are once we see what the overflow strategies dictate.

chemicL avatar Feb 21 '24 11:02 chemicL

Closing due to inactivity for a month. Please reopen in case there's feedback to the above.

chemicL avatar Mar 20 '24 18:03 chemicL

@chemicL my concrete problem was caused by fast producer while having slow consumer (this happened in production system processing big chunks of data) , not sure if having strategy is a good use case, from my point of view just having maxCapacity option for such use cases as mine would suffice, so that in case the producer hits queue capacity limit it would wait for the consumer to empty the queue before inserting new object to queue

troydm avatar Mar 29 '24 03:03 troydm