reactor-core
reactor-core copied to clipboard
GroupBy hangs with low cardinality groups; premature closing and fast publisher.
Requirement:
The requirement is such that elements from upstream should be grouped and processed sequentially (using concatMap
). Because we potentially have high carnality groups, we close the group prematurely by using a take
with 1s
to make room for other groups (might be the same key).
Problem:
The example below implements the above mentioned requirement. The example uses low
cardinality groups (the group key is always UUID % 4, which results in 4
distinct keys and therefor groups).
However, the example code below hangs after a few seconds. The 4 groups are cancelled after 1 second, the group is drained and doOnComplete
is finally called, but not new groups are created under high load. The sink#next
is repeatedly called afterwards, but that appears to end up nowhere.
When I cap the sink#next publish rate
(by introducing a Thread.sleep in the generator), every thing is fine and groups are closed and opened properly.
Another possibility to make this work is to limit
the concurrency
of the flatMap
to sum(distinct groups -1)
(e.a. There are 4
groups in the example, setting the concurrency
to < 4
makes this example work.
However, since it don't want to hard code the concurrency (I don't the know number of distinct values up front), nor do i want cap the throughput explicitly).
Maybe i'm misinterpreting how this is suppose to work, but it feel kinda off that it only occurs on high load.
Would be great if somebody would be able to explain this.
Regards.
Example:
AtomicReference<FluxSink<UUID>> sinkRef = new AtomicReference<>();
Flux.create(sinkRef::set, FluxSink.OverflowStrategy.DROP)
.publishOn(Schedulers.parallel())
.groupBy(identity())
.flatMap(group -> group
.take(Duration.ofSeconds(1)) // pessimistic exit
.concatMap(webhookRequest -> Mono.fromCallable(() -> {
log.info("Mono#fromCallable");
Thread.sleep(new Random().nextInt((250 - 50) + 1) + 50);
return singletonList("https://domain.tld.path");
}).subscribeOn(Schedulers.boundedElastic()))
.doOnComplete(() -> log.info("Completed group w/ key {}", group.key())))
.subscribe();
// await subscription
Thread.sleep(1000);
IntStream.iterate(0, i -> i + 1)
.limit(1_000_000_000)
.forEach(i -> {
// Thread.sleep(50); // uncomment me
sinkRef.get().next(new UUID(0, i % 4))
});
I've experienced the same issue.
hey @digitalmisfits. I wonder how the Thread.sleep
plays into that, despite the boundedElastic
use. The take(Duration.ofSeconds(1))
approach itself is valid, since removing the concatMap part makes the code run smoothly, opening and closing multiple groups.
FWIW I have experienced a similar hanging behavior in groupBy, and may be related to this issue. That issue is more talking about silently dropped onNext signals, but I think it's possible that the silent dropping may also be leaking request count that should otherwise be requested upstream of groupBy, leading to eventual hanging. Both issues look to be related to high load upstream of groupBy.
Similar to the workaround referenced in that issue, hanging seems to be mitigated if the upstream item publishing, GroupedFlux take scheduling ("pessimistic exit" in example), and downstream subscribe scheduling are isolated on the same Worker (using SingleWorkerScheduler):
AtomicReference<FluxSink<UUID>> sinkRef = new AtomicReference<>();
Scheduler scheduler = Schedulers.single(Schedulers.boundedElastic());
Flux.create(sinkRef::set, FluxSink.OverflowStrategy.DROP)
.publishOn(scheduler)
.groupBy(Function.identity())
.flatMap(group -> group
.take(Duration.ofSeconds(1), scheduler) // pessimistic exit
.concatMap(webhookRequest -> Mono.fromCallable(() -> {
System.out.println("Mono#fromCallable");
Thread.sleep(new Random().nextInt((250 - 50) + 1) + 50);
return Collections.singletonList("https://domain.tld.path");
}).subscribeOn(scheduler))
.doOnComplete(() -> System.out.println("Completed group w/ key " + group.key())))
.subscribe();
// await subscription
Thread.sleep(1000);
IntStream.iterate(0, i -> i + 1)
.limit(1_000_000_000)
.forEach(i -> {
//try {
// Thread.sleep(50);
//} catch (Exception e) {}
sinkRef.get().next(new UUID(0, i % 4));
});
@Sage-Pierce @digitalmisfits thanks that will help, need to look at both issues
@simonbasle I adopted this method originally from your post on stack-overflow
source.groupBy(v -> v.intValue() % 2)
.flatMap(group -> group
.take(Duration.ofMillis(1000))
.count()
.map(c -> "group " + group.key() + " size = " + c)
)
.log()
.blockLast();
https://stackoverflow.com/questions/47893575/reactor-3-x-limit-the-time-of-a-groupby-flux
About the Thread.sleep
. As this is ran on a separate executor, as long as at least 1 makes progress, next items will be pulled i guess?
FWIW I have experienced a similar hanging behavior in groupBy, and may be related to this issue. That issue is more talking about silently dropped onNext signals, but I think it's possible that the silent dropping may also be leaking request count that should otherwise be requested upstream of groupBy, leading to eventual hanging. Both issues look to be related to high load upstream of groupBy.
Similar to the workaround referenced in that issue, hanging seems to be mitigated if the upstream item publishing, GroupedFlux take scheduling ("pessimistic exit" in example), and downstream subscribe scheduling are isolated on the same Worker (using SingleWorkerScheduler):
AtomicReference<FluxSink<UUID>> sinkRef = new AtomicReference<>(); Scheduler scheduler = Schedulers.single(Schedulers.boundedElastic()); Flux.create(sinkRef::set, FluxSink.OverflowStrategy.DROP) .publishOn(scheduler) .groupBy(Function.identity()) .flatMap(group -> group .take(Duration.ofSeconds(1), scheduler) // pessimistic exit .concatMap(webhookRequest -> Mono.fromCallable(() -> { System.out.println("Mono#fromCallable"); Thread.sleep(new Random().nextInt((250 - 50) + 1) + 50); return Collections.singletonList("https://domain.tld.path"); }).subscribeOn(scheduler)) .doOnComplete(() -> System.out.println("Completed group w/ key " + group.key()))) .subscribe(); // await subscription Thread.sleep(1000); IntStream.iterate(0, i -> i + 1) .limit(1_000_000_000) .forEach(i -> { //try { // Thread.sleep(50); //} catch (Exception e) {} sinkRef.get().next(new UUID(0, i % 4)); });
Thank you for the resolution. Your issue is indeed (almost) identical. Don't we lose concurrency when using a single worker scheduler?
Yes, you would be losing concurrency, especially due to the blocking sleep
in concatMap
. Would only be mitigation and not solution, IMO
We are experiencing the same issue.
Same problem.
I am having similar issue with the groupBy and Skip. version 3.4.9 The difference is the order of the elements - 0,0,1,1,2,2 (works) vs 0,1,2,0,1,2 (hangs)
@Test
public void passingGroupByWithSkipAndTakeUntil() {
StepVerifier.create(Flux.just(0, 0, 1, 1, 2, 2)
.log()
.groupBy(Function.identity(), 1)
.flatMap(a -> a.log("group " + a.key())
.skip(1).takeUntil(i -> true))
.log("final"))
.expectSubscription()
.expectNext(0, 1, 2)
.expectComplete()
.verify(Duration.ofMillis(5000));
}
@Test
public void failingGroupByWithSkipAndTakeUntil() {
StepVerifier.create(Flux.just(0, 1, 2, 0, 1, 2)
.log()
.groupBy(Function.identity(), 1)
.flatMap(a -> a.log("group " + a.key())
.skip(1).takeUntil(i -> true))
.log("final"))
.expectSubscription()
.expectNext(0, 1, 2)
.expectComplete()
.verify(Duration.ofMillis(5000));
}