expandDeep uses unbounded queue which results in OutOfMemoryException
expandDeep uses unbounded queue internally which results in OutOfMemoryException when subscriber is not fast enough to keep up with publisher
Expected Behavior
not sure
Actual Behavior
OutOfMemoryException thrown
Steps to Reproduce
@Test
void reproCase() {
val genBytes = { Array(1024 * 1024 * 100) { 0 } }
val i = AtomicInteger(0)
Flux. just(genBytes()).expandDeep({
if (i.incrementAndGet() > 100)
Flux.empty()
else
Flux.just(genBytes())
}, 1)
.delayElements(Duration.ofMinutes(5))
.subscribe {
println("byte array processed")
}
}
Possible Solution
add variety of expand and expandDeep methods with bounded queue in order to limit memory usage
Your Environment
- Reactor version(s) used: 3.4.27
- Other relevant libraries versions (eg.
netty, ...): - JVM version (
java -version): 11 - OS and version (eg
uname -a): Windows 10
Supplier<List<Byte>> genBytes = () -> Arrays.asList(new Byte[1024*1024*100]);
Flux.just(genBytes.get()).subscribe(); // OOM
Hmm maybe expand's unbounded queue is not root cause of OOM?
Like above, Flux.just(genBytes()) also trigger OOM in my local env without expandDeep().
@injae-kim strange, genBytes() generates 1gb byte array which shouldn't cause oom, how much memory do you have set as maximum, you can modify it to generate less memory, the point of this issue still lays in expandDeep which internally is using unbounded queue
https://github.com/reactor/reactor-core/blob/b42d9ab1f8220db632944aee4e44ebb1d3b81662/reactor-core/src/main/java/reactor/core/publisher/FluxExpand.java#L107
https://github.com/reactor/reactor-core/blob/b42d9ab1f8220db632944aee4e44ebb1d3b81662/reactor-core/src/main/java/reactor/core/publisher/FluxExpand.java#L235
the point of this issue still lays in expandDeep which internally is using unbounded queue
oh I see. I checked that expand() and expandDeep() uses unbounded queue, deque internally and it's the root cause of OOM in this case.
My Idea
public final Flux<T> expandDeep(Function<? super T, ? extends Publisher<? extends T>> expander, int capacityHint,
int maxCapacity, BiConsumer<? super Queue, ? super T> onMaxCapacity) { // 👈👈
return onAssembly(new FluxExpand<>(this, expander, false, capacityHint, maxCapacity, onMaxCapacity));
}
// e.g.
Flux.just(..)
.expandDeep(publisher -> .., ccapacityHint,
maxCapacity: 1024,
onMaxCapacity: (queue, elem) -> { /* just drop or replace last element, ..*/ } // 👈👈
If we use bounded queue/deque on expand/expandDeep, I think we can allow user to set maxCapacity and custom onMaxCapacity strategy.
Then user can determine what should queue/deque do when it reaches maxCapacity. (e.g. just drop new element or replace last element, leave log ..)
or, we can just simply drop new element when queue/deque reaches maxCapacity and don't allow user to set onMaxCapacity strategy.
Hi @OlegDokuka , can you share your opinion please? 🙇 After checking maintainer's opinion, I'll create PR :) thank you!
@injae-kim thanks for taking the time to analyze this issue 👏 @troydm can you please have a look and provide a potential use case for the proposed API?
I am to be honest a bit blurry about practical applications of this and wonder what sort of algorithms can be implemented on top of a bounded capacity variant that includes a strategy for dealing with overflows. If there's a practical use for it, I suppose we can base the actual design on the use cases instead of first designing and then considering what can be done with that :)
Thanks in advance for providing use cases that are currently impossible and what could be the usage of a bounded capacity-based implementation, regardless of the final design -> we can see what the final requirements are once we see what the overflow strategies dictate.
Closing due to inactivity for a month. Please reopen in case there's feedback to the above.
@chemicL my concrete problem was caused by fast producer while having slow consumer (this happened in production system processing big chunks of data) , not sure if having strategy is a good use case, from my point of view just having maxCapacity option for such use cases as mine would suffice, so that in case the producer hits queue capacity limit it would wait for the consumer to empty the queue before inserting new object to queue