reactor-core
reactor-core copied to clipboard
FluxGroupBy silently drops onNext signals
I have a use case where I'm using Flux::groupBy
with a relatively high cardinality of grouping (1024-4096 concurrent groups at any given time). FWIW, the use case itself is similar to a sort of deduplication functionality on an infinite stream.
Each "group" is limited in time (.take(Duration)
) and size (.take(Long)
) and collected as a List. Under high load, it appears some items are being dropped.
Thus far, I think I can point to where the relevant pieces of code are:
-
FluxGroupBy::onNext
may be retrieving a UnicastGroupedFlux racing to termination: https://github.com/reactor/reactor-core/blob/master/reactor-core/src/main/java/reactor/core/publisher/FluxGroupBy.java#L194-L212 -
UnicastGroupedFlux::drainRegular
may be short circuit returning after emitting non-zero number of elements https://github.com/reactor/reactor-core/blob/master/reactor-core/src/main/java/reactor/core/publisher/FluxGroupBy.java#L545-L554 -
UnicastGroupedFlux::checkTerminated
silently clears its queue when checking termination: https://github.com/reactor/reactor-core/blob/master/reactor-core/src/main/java/reactor/core/publisher/FluxGroupBy.java#L642-L647
Expected Behavior
Items are not dropped by groupBy
operator
Actual Behavior
Items emitted from upstream of groupBy
operator are not emitted downstream, and are silently being dropped.
Steps to Reproduce
I wrote the following test to show a simplified version of what's going on with my use case:
@Test
public void test() throws Exception {
// Hooks.onNextDroppedFail(); // This actually has no effect; Was thinking it would cause failure with CancelException
AtomicLong upstream = new AtomicLong(0L);
AtomicLong downstream = new AtomicLong(0L);
CountDownLatch latch = new CountDownLatch(1);
Flux.fromStream(Stream.iterate(0L, (last) -> (long) (Math.random() * 4096)))
.flatMap(number -> Flux.concat(
Mono.just(number),
Mono.just(number).delayElement(Duration.ofMillis((int) (Math.random() * 2000)))),
4096)
.take(Duration.ofSeconds(30L))
.doOnNext(next -> upstream.incrementAndGet())
.publishOn(Schedulers.elastic())
.groupBy(Function.identity())
.flatMap(groupFlux -> groupFlux.take(Duration.ofSeconds(1L))
.take(2)
.collectList(), 16384)
.doOnNext(batch -> {
try {
Thread.sleep(1L);
} catch (Exception e) {
}
})
.map(Collection::size)
.subscribe(downstream::addAndGet, System.err::println, latch::countDown);
latch.await();
assertEquals(upstream.get(), downstream.get());
}
Interestingly, the test sometimes passes without the doOnNext
with Thread.sleep(1L)
. I had to add that to consistently get it to fail (and mimic small real-world processing overhead of each downstream item)
Possible Solution
I'm not sure. Still trying to figure out exactly what the problem is in groupBy
Your Environment
- Reactor version(s) used: 3.3.9.RELEASE
- JVM version: 1.8.0_172
- OS and version (eg
uname -a
): Mac OS Catalina 10.15.6 (Darwin Kernel Version 19.6.0 x86_64
)
I think I've isolated this particular unexpected behavior to being the result of a race condition in FluxGroupBy
between draining and cancellation on UnicastGroupedFlux
.
I think I've figured a workaround that both mitigates my issue and verifies the race condition. If I isolate both publish and subscribe operations on FluxGroupBy with the same Worker (Thread) that's responsible for the take(Duration)
scheduling (on the GroupedFlux
instances), the problem goes away:
@Test
public void test() throws Exception {
// Hooks.onNextDroppedFail(); // This actually has no effect; Was thinking it would cause failure with CancelException
AtomicLong upstream = new AtomicLong(0L);
AtomicLong downstream = new AtomicLong(0L);
CountDownLatch latch = new CountDownLatch(1);
Scheduler scheduler = Schedulers.single(Schedulers.elastic());
Flux.fromStream(Stream.iterate(0L, (last) -> (long) (Math.random() * 4096)))
.flatMap(number -> Flux.concat(
Mono.just(number),
Mono.just(number).delayElement(Duration.ofMillis((int) (Math.random() * 2000)))),
4096)
.take(Duration.ofSeconds(30L))
.doOnNext(next -> upstream.incrementAndGet())
.publishOn(scheduler)
.groupBy(Function.identity())
.flatMap(groupFlux -> groupFlux.take(Duration.ofSeconds(1L), scheduler)
.take(2)
.collectList(), 16384)
.subscribeOn(scheduler, true)
.doOnNext(batch -> {
try {
Thread.sleep(1L);
} catch (Exception e) {
}
})
.map(Collection::size)
.subscribe(downstream::addAndGet, System.err::println, latch::countDown);
latch.await();
assertEquals(upstream.get(), downstream.get());
}
I suppose my question to Maintainers is "what should be the general expected behavior in the absence of this Scheduler isolation?"
At the very least, silent dropping of onNext items seems like a bug.
I think it may be debatable whether or not item dropping should be possible in this situation. If so, such behavior may be a surprise to clients, especially without decent knowledge of how publish and subscribe operation threading may cause this condition
@Sage-Pierce you seem to be blocking the non-blocking threads. Maybe you wanted publishOn
, not subscribeOn
before doOnNext
?
when I remove Thread.sleep
from doOnNext
everything seems to be fine. Since doing Thread.sleep
there is incorrect, @Sage-Pierce please provide a reproducer that is not relying on it.
FWIW, the use case itself is similar to a sort of deduplication functionality on an infinite stream.
For deduplicating an infinite stream, I'd recommend something like this (in case Flux#distinct
does not work for you):
Flux.<Integer>generate(sink -> sink.next(ThreadLocalRandom.current().nextInt(0, 100)))
.log("producer")
.transformDeferred(flux -> {
Set<Integer> items = new HashSet<>();
return flux.filter(items::add);
})
.log("next")
.take(100)
.blockLast();
Note that the Set
of items may produce OOM as well due to it being infinite.
Consider using some Cache instead (https://github.com/ben-manes/caffeine or Guava) that supports TTLs / eviction.
Thank you for the comments and suggestions, @bsideup!
you seem to be blocking the non-blocking threads. Maybe you wanted publishOn, not subscribeOn before doOnNext?
I'll update the failure scenario code (below) to use blocking threads
when I remove Thread.sleep from doOnNext everything seems to be fine
Although this is indeed blocking, the sleep
call is present to mimic a small (1ms) overhead of processing downstream items. In actual usage, the process does some non-trivial computationally-bounded work that, IMO, isn't relevant to the issue that I think is being exposed. However, I believe the overhead itself is relevant due to increasing the likelihood of racing between publish and subscribe methods of FluxGroupBy (when not delegated to same Worker/Thread).
Here's an example without sleep
, but with analogous processing overhead. Note that the test fails or succeeds based on whichever Scheduler configuration is used:
@Test
public void test() throws Exception {
// Hooks.onNextDroppedFail(); // This actually has no effect; Was thinking it would cause failure with CancelException
AtomicLong upstream = new AtomicLong(0L);
AtomicLong downstream = new AtomicLong(0L);
CountDownLatch latch = new CountDownLatch(1);
Scheduler scheduler = Schedulers.elastic(); // Fails
// Scheduler scheduler = Schedulers.single(Schedulers.elastic()); // Succeeds
Flux.fromStream(Stream.iterate(0L, (last) -> (long) (Math.random() * 4096)))
.flatMap(number -> Flux.concat(
Mono.just(number),
Mono.just(number).delayElement(Duration.ofMillis((int) (Math.random() * 2000)))),
4096)
.take(Duration.ofSeconds(30L))
.doOnNext(next -> upstream.incrementAndGet())
.publishOn(scheduler)
.groupBy(Function.identity())
.flatMap(groupFlux -> groupFlux.take(Duration.ofSeconds(1L), scheduler)
.take(2)
.collectList(), 16384)
.subscribeOn(scheduler, true)
.doOnNext(batch -> {
// Mimic real-world computationally-bound processing overhead
long startNano = System.nanoTime();
while (System.nanoTime() - startNano < 1_000_000) ;
})
.map(Collection::size)
.subscribe(downstream::addAndGet, System.err::println, latch::countDown);
latch.await();
assertEquals(upstream.get(), downstream.get());
For deduplicating an infinite stream, I'd recommend something like this ... using some Cache instead (https://github.com/ben-manes/caffeine or Guava) that supports TTLs / eviction
Using caches with TTLs and eviction is an interesting path to consider. However, there are a few problems with substituting it for the deduplication functionality I'm accomplishing with Reactor. For one, there would be less memory efficiency due to keeping items in the caches longer than they actually need to be. For another, the behavior is slightly different; In my case, when there are items/entities with duplicate IDs in the same window, I want to emit the latest item once the window expires. The provided Set-based substitute would emit the first/earliest item. There might be a way to implement emission of the latest items if Caffeine or Guava provides something like onEviction
or onExpire
callbacks, but I'm partial to the more-readable style that the Reactor code provides (and one-less library dependency 😄 ).
@Sage-Pierce I still see .subscribeOn(scheduler, true)
in the code - is it intentional? The problem is that you're blocking the parallel scheduler and it may be that the delays you're using are not getting executed hence the lost items
I still see .subscribeOn(scheduler, true) in the code - is it intentional? The problem is that you're blocking the parallel scheduler
Yes, the subscribeOn
is intentional such that groupBy
subscription methods are executed with the same Worker that executes upstream publishing and GroupedFlux take
cancelling. TBH, I mainly have it there since my real-world usage has further asynchronous boundaries (publishOn
) downstream of subscribeOn
. That being said, I can remove the subscribeOn
altogether and still get same behavior, so it can be removed if it makes debugging easier.
I could certainly be wrong, but I don't think the latest code should be blocking on the parallel Scheduler anymore, due to explicitly specifying non-parallel Scheduler 🤔
FWIW, code without subscribeOn
:
@Test
public void test() throws Exception {
// Hooks.onNextDroppedFail(); // This actually has no effect; Was thinking it would cause failure with CancelException
AtomicLong upstream = new AtomicLong(0L);
AtomicLong downstream = new AtomicLong(0L);
CountDownLatch latch = new CountDownLatch(1);
Scheduler scheduler = Schedulers.elastic(); // Fails
//Scheduler scheduler = Schedulers.single(Schedulers.elastic()); // Succeeds
Flux.fromStream(Stream.iterate(0L, (last) -> (long) (Math.random() * 4096)))
.flatMap(number -> Flux.concat(
Mono.just(number),
Mono.just(number).delayElement(Duration.ofMillis((int) (Math.random() * 2000)))),
4096)
.take(Duration.ofSeconds(30L))
.doOnNext(next -> upstream.incrementAndGet())
.publishOn(scheduler)
.groupBy(Function.identity())
.flatMap(groupFlux -> groupFlux.take(Duration.ofSeconds(1L), scheduler)
.take(2)
.collectList(), 16384)
.doOnNext(batch -> {
// Mimic real-world computationally-bound processing overhead
long startNano = System.nanoTime();
while (System.nanoTime() - startNano < 1_000_000) ;
})
.map(Collection::size)
.subscribe(downstream::addAndGet, System.err::println, latch::countDown);
latch.await();
assertEquals(upstream.get(), downstream.get());
}
@Sage-Pierce this code still blocks the thread (by running the heavy computations). Consider using publishOn
before doOnNext
@bsideup I'll go ahead and add the publishOn
, however I'm not certain I understand the value it provides as far as diagnosing the issue 🤔 I feel like it would at best seem to mask it by working around whatever/wherever the problem is. However, I could again be missing something.
In any case, with the publishOn, I get more interesting behavior. The test either hangs or succeeds with following code:
@Test
public void test() throws Exception {
// Hooks.onNextDroppedFail(); // This actually has no effect; Was thinking it would cause failure with CancelException
AtomicLong upstream = new AtomicLong(0L);
AtomicLong downstream = new AtomicLong(0L);
CountDownLatch latch = new CountDownLatch(1);
Scheduler scheduler = Schedulers.elastic(); // Hangs
// Scheduler scheduler = Schedulers.single(Schedulers.elastic()); // Succeeds
Flux.fromStream(Stream.iterate(0L, (last) -> (long) (Math.random() * 4096)))
.flatMap(number -> Flux.concat(
Mono.just(number),
Mono.just(number).delayElement(Duration.ofMillis((int) (Math.random() * 2000)))),
4096)
.take(Duration.ofSeconds(30L))
.doOnNext(next -> upstream.incrementAndGet())
.publishOn(scheduler)
.groupBy(Function.identity())
.flatMap(groupFlux -> groupFlux.take(Duration.ofSeconds(1L), scheduler)
.take(2)
.collectList(), 16384)
.publishOn(scheduler)
.doOnNext(batch -> {
// Mimic real-world computationally-bound processing overhead
long startNano = System.nanoTime();
while (System.nanoTime() - startNano < 1_000_000) ;
})
.map(Collection::size)
.subscribe(downstream::addAndGet, System.err::println, latch::countDown);
latch.await();
assertEquals(upstream.get(), downstream.get());
}
And either hangs or flaky hang-fails with following code (note difference in downstream publishOn
Scheduler):
@Test
public void test() throws Exception {
// Hooks.onNextDroppedFail(); // This actually has no effect; Was thinking it would cause failure with CancelException
AtomicLong upstream = new AtomicLong(0L);
AtomicLong downstream = new AtomicLong(0L);
CountDownLatch latch = new CountDownLatch(1);
Scheduler scheduler = Schedulers.elastic(); // Hangs
// Scheduler scheduler = Schedulers.single(Schedulers.elastic()); // Flaky hang-fails
Flux.fromStream(Stream.iterate(0L, (last) -> (long) (Math.random() * 4096)))
.flatMap(number -> Flux.concat(
Mono.just(number),
Mono.just(number).delayElement(Duration.ofMillis((int) (Math.random() * 2000)))),
4096)
.take(Duration.ofSeconds(30L))
.doOnNext(next -> upstream.incrementAndGet())
.publishOn(scheduler)
.groupBy(Function.identity())
.flatMap(groupFlux -> groupFlux.take(Duration.ofSeconds(1L), scheduler)
.take(2)
.collectList(), 16384)
.publishOn(Schedulers.newParallel("test"))
.doOnNext(batch -> {
// Mimic real-world computationally-bound processing overhead
long startNano = System.nanoTime();
while (System.nanoTime() - startNano < 1_000_000) ;
})
.map(Collection::size)
.subscribe(downstream::addAndGet, System.err::println, latch::countDown);
latch.await();
assertEquals(upstream.get(), downstream.get());
}
Interestingly, the hanging behavior seems similar to issue discussed in #2138
Although I haven't fixed the issue yet, I think I got a much simpler reproducer:
final int total = 100;
Long count = Flux.range(0, total)
.groupBy(i -> (i / 2) * 2, 42)
.flatMap(it -> it.take(1), 2)
.publishOn(Schedulers.parallel(), 2)
.count()
.block(Duration.ofSeconds(60));
assertThat(total - count).as("count").isZero();
fails with:
Expected :0
Actual :44
@Sage-Pierce since I see that you've reworked your code, and since we've made other changes to groupBy in the past year, can this issue be closed?
@simonbasle I'm not sure I can say this isn't still an issue. Even with Reactor 3.4.12
I can still observe silently dropped emissions with the following simplified example:
@Test
public void upstreamEmissionsShouldMatchDownstream() throws Exception {
Hooks.onNextDroppedFail();
int numGroups = 8;
AtomicLong upstream = new AtomicLong(0L);
AtomicLong downstream = new AtomicLong(0L);
CompletableFuture<Void> future = new CompletableFuture<>();
Flux.generate(sink -> sink.next(UUID.randomUUID()))
.take(Duration.ofSeconds(30L))
.doOnNext(next -> upstream.incrementAndGet())
.groupBy(uuid -> Math.abs(uuid.hashCode() % numGroups))
.flatMap(groupFlux -> groupFlux.take(Duration.ofSeconds(1L)), numGroups)
.subscribe(next -> downstream.incrementAndGet(), future::completeExceptionally, () -> future.complete(null));
future.get();
System.out.println("Emitted: " + upstream.get());
assertEquals(upstream.get(), downstream.get());
}
I believe the problem is the time-based take
on the "groups" from groupBy
. What this appears to cause is a race condition between upstream item emissions and group cancellation (which happens on a different thread than the upstream emissions). If the .take(Duration.ofSeconds(1L))
is replaced with something non-timer-oriented, like .take(2)
, the test works fine.
To your point, though, I have found a workaround for this by ensuring that upstream publishing and group cancellation happens on the same thread by using a single-Worker Scheduler. The following modified example causes the test to pass:
@Test
public void upstreamEmissionsShouldMatchDownstream() throws Exception {
Hooks.onNextDroppedFail();
int numGroups = 8;
Scheduler scheduler = Schedulers.single(Schedulers.boundedElastic());
AtomicLong upstream = new AtomicLong(0L);
AtomicLong downstream = new AtomicLong(0L);
CompletableFuture<Void> future = new CompletableFuture<>();
Flux.generate(sink -> sink.next(UUID.randomUUID()))
.take(Duration.ofSeconds(30L))
.publishOn(scheduler)
.doOnNext(next -> upstream.incrementAndGet())
.groupBy(uuid -> Math.abs(uuid.hashCode() % numGroups))
.flatMap(groupFlux -> groupFlux.take(Duration.ofSeconds(1L), scheduler), numGroups)
.subscribe(next -> downstream.incrementAndGet(), future::completeExceptionally, () -> future.complete(null));
future.get();
System.out.println("Emitted: " + upstream.get());
assertEquals(upstream.get(), downstream.get());
}
I still suspect users would be scratching their heads over the case where items are dropped. At the very least, I would expect the dropped onNext
signals to result in an error due to the onNextDroppedFail
hook. Ideally, however, I think it would be preferable that items are not dropped at all.