reactor-core icon indicating copy to clipboard operation
reactor-core copied to clipboard

New operator - delayBetween

Open yos1p opened this issue 1 year ago • 9 comments

Summary:

Current Flux.delayElements will delay the elements since the start of the first elements. It would be nice to have another operator that will give delay between elements.

Example:

Flux.delayElements(Duration.ofHour(1)) 5 Elements arrived --> delay 1 hour --> element 1 processed --> delay 1 hour --> element 2 processed ...

Flux.delayBetween(Duration.ofHour(1)) 5 elements arrived --> element 1 processed --> delay 1 hour --> element 2 processed ...

yos1p avatar Sep 21 '23 02:09 yos1p

Just to be clear on the behaviour, please try this:

Flux.range(0, 10)
	.doFirst(() -> System.out.println(Instant.now() + ": START"))
	.switchOnFirst((i, f) -> {
		if (i.hasValue()) {
			return Flux.concat(
				Flux.just(i.get()),
				f.skip(1).delayElements(Duration.ofSeconds(1)));
		}
		return f;
	})
	.doOnNext(i -> System.out.println(Instant.now() + ": " + i))
	.blockLast();

Output:

2023-09-21T06:18:12.238Z: START
2023-09-21T06:18:12.295Z: 0
2023-09-21T06:18:13.312Z: 1
2023-09-21T06:18:14.317Z: 2
2023-09-21T06:18:15.323Z: 3
2023-09-21T06:18:16.329Z: 4
2023-09-21T06:18:17.335Z: 5
2023-09-21T06:18:18.342Z: 6
2023-09-21T06:18:19.345Z: 7
2023-09-21T06:18:20.351Z: 8
2023-09-21T06:18:21.359Z: 9

Is this behaviour what you are looking for?

chemicL avatar Sep 21 '23 06:09 chemicL

@chemicL In the test, with .blockLast() it works. But when I tried, using flatMap() and subscribe(), seems the delay always starts from the first element.

Take a look at the following sample code:

Flux myFlux = ...
myFlux.retry()
       .flatMap(this::getDetails) // Method returns Flux<String>
       .bufferTimeout(throttlePerHourCap, Duration.ofSeconds(10))
       .switchOnFirst((signal, flux) -> {
           if (signal.hasValue()) {
               return Flux.concat(
                     Flux.just(Objects.requireNonNull(signal.get(), "First element invalid...")),
                     flux.skip(1).delayElements(Duration.ofHour(1))
               );
           }
           return flux;
       })
       .flatMap(this::throttlePerHour) // Method returns Mono<Void>
       .subscribe();

yos1p avatar Sep 21 '23 09:09 yos1p

Please provide something I can copy and paste, without any internal details of your project.

chemicL avatar Sep 21 '23 12:09 chemicL

Hmm, seems to be working after second try.

Was hoping if we can have an built-in operator though. Is it part of the roadmap?

yos1p avatar Sep 21 '23 15:09 yos1p

Let's leave the issue open for some time. If there is interest in a new operator and this issue gets some traction, we can consider adding it. However, it seems this is behaviour can be easily achieved with existing operators. Another alternative would probably be

Flux.concat(source.take(1), source.skip(1).delayElements())

chemicL avatar Sep 27 '23 08:09 chemicL

It would be handy, even if doable in an other way. Pipelines would be easier to read too. I have a usecase where I need to space elements because next action can only be done 1 time maximum per X seconds. In this case, there is no point to wait for the first element

alixroyere avatar Oct 26 '23 12:10 alixroyere

Well thinking more about this, a real delayBetween operator would not only be skipping the first element waiting. That is more a special case. In fact, comparing the elapsed time since the last emission and the requested delay, it could directly emit when there is no need to delay.

@Test
void delayElement() {
    StepVerifier.withVirtualTime(() -> {
                    var data = merge(
                        Mono.just(0).delayElement(Duration.ofSeconds(0)),
                        Mono.just(40).delayElement(Duration.ofSeconds(40)),
                        Mono.just(50).delayElement(Duration.ofSeconds(50))
                    );
                    return data.delayElements(Duration.ofSeconds(10));
                })// received at 10 - 50 - 60 when it could be 0 - 40 - 50 with delayBetween
                .thenAwait(Duration.ofSeconds(10))
                .expectNextCount(1)
                .thenAwait(Duration.ofSeconds(50 - 10))
                .expectNextCount(1)
                .thenAwait(Duration.ofSeconds(60 - 50))
                .expectNextCount(1)
                .expectComplete()
                .verify(Duration.ofMillis(100));
}

@Test
void delayElementExceptFirst() {
    StepVerifier.withVirtualTime(() -> {
                    var data = merge(
                        Mono.just(0).delayElement(Duration.ofSeconds(0)),
                        Mono.just(40).delayElement(Duration.ofSeconds(40)),
                        Mono.just(50).delayElement(Duration.ofSeconds(50))
                    );

                    return data.switchOnFirst(
                        (signal, flux) -> Flux.concat(Flux.just(flux.take(1)),
                                                      flux.skip(1).delayElements(Duration.ofSeconds(10)))
                    );
                })// received at 0 - 50 - 60 when it could be 0 - 40 - 50 with delayBetween
                .thenAwait(Duration.ofSeconds(0))
                .expectNextCount(1)
                .thenAwait(Duration.ofSeconds(50))
                .expectNextCount(1)
                .thenAwait(Duration.ofSeconds(60 - 50))
                .expectNextCount(1)
                .expectComplete()
                .verify(Duration.ofMillis(100));
}

alixroyere avatar Oct 26 '23 23:10 alixroyere

@AlixRoyere in this case it's probably a good use for a zip operation with interval. In my eyes actually this perspective doesn't solidify the need for the delayBetween operator but rather shows it's not obvious what the expectation is and specific requirements for a particular use case can be expressed with existing operators.

Consider this:

Flux.zip(
	Flux.range(0, 10)
	       .doFirst(() -> System.out.println(Instant.now() + ": START")),
	Flux.interval(Duration.ZERO, Duration.ofSeconds(1))
)
    .map(Tuple2::getT1)
    .doOnNext(i -> System.out.println(Instant.now() + ": " + i))
    .blockLast();

With the output:

2023-10-31T12:49:45.960570Z: START
2023-10-31T12:49:45.969872Z: 0
2023-10-31T12:49:46.971268Z: 1
2023-10-31T12:49:47.974015Z: 2
2023-10-31T12:49:48.972947Z: 3
2023-10-31T12:49:49.974434Z: 4
2023-10-31T12:49:50.974538Z: 5
2023-10-31T12:49:51.973992Z: 6
2023-10-31T12:49:52.974480Z: 7
2023-10-31T12:49:53.972106Z: 8
2023-10-31T12:49:54.974418Z: 9

I believe this is even more appropriate for the original case, where it seems the events should happen every hour, regardless how long each individual task takes.

chemicL avatar Oct 31 '23 12:10 chemicL

It is useful. I am looking for this too. And this is the usual use case. Delay all (including the first element) is rarely needed. You can find a lot of question in stackoverflow about how to do this delayBetween operation

knuclechan avatar Nov 15 '23 08:11 knuclechan