reactor-core icon indicating copy to clipboard operation
reactor-core copied to clipboard

GroupBy hangs with low cardinality groups; premature closing and fast publisher.

Open digitalmisfits opened this issue 4 years ago • 10 comments

Requirement:

The requirement is such that elements from upstream should be grouped and processed sequentially (using concatMap). Because we potentially have high carnality groups, we close the group prematurely by using a take with 1s to make room for other groups (might be the same key).

Problem:

The example below implements the above mentioned requirement. The example uses low cardinality groups (the group key is always UUID % 4, which results in 4 distinct keys and therefor groups).

However, the example code below hangs after a few seconds. The 4 groups are cancelled after 1 second, the group is drained and doOnComplete is finally called, but not new groups are created under high load. The sink#next is repeatedly called afterwards, but that appears to end up nowhere.

When I cap the sink#next publish rate (by introducing a Thread.sleep in the generator), every thing is fine and groups are closed and opened properly.

Another possibility to make this work is to limit the concurrency of the flatMap to sum(distinct groups -1) (e.a. There are 4 groups in the example, setting the concurrency to < 4 makes this example work.

However, since it don't want to hard code the concurrency (I don't the know number of distinct values up front), nor do i want cap the throughput explicitly).

Maybe i'm misinterpreting how this is suppose to work, but it feel kinda off that it only occurs on high load.

Would be great if somebody would be able to explain this.

Regards.

Example:

    AtomicReference<FluxSink<UUID>> sinkRef = new AtomicReference<>();

    Flux.create(sinkRef::set, FluxSink.OverflowStrategy.DROP)
            .publishOn(Schedulers.parallel())
            .groupBy(identity())
            .flatMap(group -> group
                    .take(Duration.ofSeconds(1)) // pessimistic exit
                    .concatMap(webhookRequest -> Mono.fromCallable(() -> {
                        log.info("Mono#fromCallable");
                        Thread.sleep(new Random().nextInt((250 - 50) + 1) + 50);
                        return singletonList("https://domain.tld.path");
                    }).subscribeOn(Schedulers.boundedElastic()))
                    .doOnComplete(() -> log.info("Completed group w/ key {}", group.key())))
            .subscribe();

    // await subscription
    Thread.sleep(1000);

    IntStream.iterate(0, i -> i + 1)
            .limit(1_000_000_000)
            .forEach(i -> {
            //                    Thread.sleep(50); // uncomment me
                sinkRef.get().next(new UUID(0, i % 4))
            });

digitalmisfits avatar Apr 27 '20 13:04 digitalmisfits

I've experienced the same issue.

Cesarla avatar Apr 28 '20 10:04 Cesarla

hey @digitalmisfits. I wonder how the Thread.sleep plays into that, despite the boundedElastic use. The take(Duration.ofSeconds(1)) approach itself is valid, since removing the concatMap part makes the code run smoothly, opening and closing multiple groups.

simonbasle avatar Sep 28 '20 15:09 simonbasle

FWIW I have experienced a similar hanging behavior in groupBy, and may be related to this issue. That issue is more talking about silently dropped onNext signals, but I think it's possible that the silent dropping may also be leaking request count that should otherwise be requested upstream of groupBy, leading to eventual hanging. Both issues look to be related to high load upstream of groupBy.

Similar to the workaround referenced in that issue, hanging seems to be mitigated if the upstream item publishing, GroupedFlux take scheduling ("pessimistic exit" in example), and downstream subscribe scheduling are isolated on the same Worker (using SingleWorkerScheduler):

        AtomicReference<FluxSink<UUID>> sinkRef = new AtomicReference<>();

        Scheduler scheduler = Schedulers.single(Schedulers.boundedElastic());

        Flux.create(sinkRef::set, FluxSink.OverflowStrategy.DROP)
            .publishOn(scheduler)
            .groupBy(Function.identity())
            .flatMap(group -> group
                .take(Duration.ofSeconds(1), scheduler) // pessimistic exit
                .concatMap(webhookRequest -> Mono.fromCallable(() -> {
                    System.out.println("Mono#fromCallable");
                    Thread.sleep(new Random().nextInt((250 - 50) + 1) + 50);
                    return Collections.singletonList("https://domain.tld.path");
                }).subscribeOn(scheduler))
                .doOnComplete(() -> System.out.println("Completed group w/ key " + group.key())))
            .subscribe();

        // await subscription
        Thread.sleep(1000);

        IntStream.iterate(0, i -> i + 1)
            .limit(1_000_000_000)
            .forEach(i -> {
                //try {
                //    Thread.sleep(50);
                //} catch (Exception e) {}
                sinkRef.get().next(new UUID(0, i % 4));
            });

Sage-Pierce avatar Sep 28 '20 17:09 Sage-Pierce

@Sage-Pierce @digitalmisfits thanks that will help, need to look at both issues

simonbasle avatar Sep 28 '20 17:09 simonbasle

@simonbasle I adopted this method originally from your post on stack-overflow

source.groupBy(v -> v.intValue() % 2)
      .flatMap(group -> group
              .take(Duration.ofMillis(1000))
              .count()
              .map(c -> "group " + group.key() + " size = " + c)
      )
      .log()
      .blockLast();

https://stackoverflow.com/questions/47893575/reactor-3-x-limit-the-time-of-a-groupby-flux

About the Thread.sleep. As this is ran on a separate executor, as long as at least 1 makes progress, next items will be pulled i guess?

digitalmisfits avatar Oct 06 '20 16:10 digitalmisfits

FWIW I have experienced a similar hanging behavior in groupBy, and may be related to this issue. That issue is more talking about silently dropped onNext signals, but I think it's possible that the silent dropping may also be leaking request count that should otherwise be requested upstream of groupBy, leading to eventual hanging. Both issues look to be related to high load upstream of groupBy.

Similar to the workaround referenced in that issue, hanging seems to be mitigated if the upstream item publishing, GroupedFlux take scheduling ("pessimistic exit" in example), and downstream subscribe scheduling are isolated on the same Worker (using SingleWorkerScheduler):

        AtomicReference<FluxSink<UUID>> sinkRef = new AtomicReference<>();

        Scheduler scheduler = Schedulers.single(Schedulers.boundedElastic());

        Flux.create(sinkRef::set, FluxSink.OverflowStrategy.DROP)
            .publishOn(scheduler)
            .groupBy(Function.identity())
            .flatMap(group -> group
                .take(Duration.ofSeconds(1), scheduler) // pessimistic exit
                .concatMap(webhookRequest -> Mono.fromCallable(() -> {
                    System.out.println("Mono#fromCallable");
                    Thread.sleep(new Random().nextInt((250 - 50) + 1) + 50);
                    return Collections.singletonList("https://domain.tld.path");
                }).subscribeOn(scheduler))
                .doOnComplete(() -> System.out.println("Completed group w/ key " + group.key())))
            .subscribe();

        // await subscription
        Thread.sleep(1000);

        IntStream.iterate(0, i -> i + 1)
            .limit(1_000_000_000)
            .forEach(i -> {
                //try {
                //    Thread.sleep(50);
                //} catch (Exception e) {}
                sinkRef.get().next(new UUID(0, i % 4));
            });

Thank you for the resolution. Your issue is indeed (almost) identical. Don't we lose concurrency when using a single worker scheduler?

digitalmisfits avatar Oct 06 '20 16:10 digitalmisfits

Yes, you would be losing concurrency, especially due to the blocking sleep in concatMap. Would only be mitigation and not solution, IMO

Sage-Pierce avatar Oct 08 '20 14:10 Sage-Pierce

We are experiencing the same issue.

pavelkuchin avatar Dec 09 '20 15:12 pavelkuchin

Same problem.

privettoli avatar Dec 18 '20 05:12 privettoli

I am having similar issue with the groupBy and Skip. version 3.4.9 The difference is the order of the elements - 0,0,1,1,2,2 (works) vs 0,1,2,0,1,2 (hangs)

    @Test
    public void passingGroupByWithSkipAndTakeUntil() {
        StepVerifier.create(Flux.just(0, 0, 1, 1, 2, 2)
                        .log()
                        .groupBy(Function.identity(), 1)
                        .flatMap(a -> a.log("group " + a.key())
                                .skip(1).takeUntil(i -> true))
                        .log("final"))
                .expectSubscription()
                .expectNext(0, 1, 2)
                .expectComplete()
                .verify(Duration.ofMillis(5000));
    }

    @Test
    public void failingGroupByWithSkipAndTakeUntil() {
        StepVerifier.create(Flux.just(0, 1, 2, 0, 1, 2)
                        .log()
                        .groupBy(Function.identity(), 1)
                        .flatMap(a -> a.log("group " + a.key())
                                .skip(1).takeUntil(i -> true))
                        .log("final"))
                .expectSubscription()
                .expectNext(0, 1, 2)
                .expectComplete()
                .verify(Duration.ofMillis(5000));
    }

sergproua avatar Sep 11 '21 11:09 sergproua