reactor-core icon indicating copy to clipboard operation
reactor-core copied to clipboard

Flux.concatMapDelayError not collecting errors when prefetch = 0

Open hhassan1 opened this issue 2 years ago • 2 comments

Case as explained below. It may also be that I misunderstood what prefetch implies.

Expected Behavior

When using Flux.concatMapDelayError with prefetch = 0, the collection of exceptions in the form of Exceptions.CompositeException should be forwarded downstream.

Actual Behavior

Only the first exception is visible downstream.

Steps to Reproduce

import org.junit.jupiter.api.Test;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Signal;
 
public class RandomTest {
	@Test
	void reproduceCase() {
		Signal<Object> zeroPrefetchSignal = Flux.just(1, 2, 3)
			.concatMapDelayError(n -> Mono.error(new Exception(n.toString())), 0)
			.materialize()
			.next()
			.block();
 
		Signal<Object> onePrefetchSignal = Flux.just(1, 2, 3)
			.concatMapDelayError(n -> Mono.error(new Exception(n.toString())), 1)
			.materialize()
			.next()
			.block();
 
		assert !Exceptions.isMultiple(zeroPrefetchSignal.getThrowable());
		assert Exceptions.isMultiple(onePrefetchSignal.getThrowable());
	}
}

Your Environment

  • Reactor version(s) used: 3.4.14
  • JVM: adopt-openj9-11.0.9
  • OS: MacOS 12.1

hhassan1 avatar Feb 10 '22 12:02 hhassan1

This does look like a bug, thanks for reporting @hhassan1

In addition, looking beyond that at the current state of concatMapDelayError, I think things are a bit confusing and implementation vs javadoc is not in a great shape. We need to figure out a consistent behavior for delay error in the context of concatMap, taking into account prefetching cases vs 0-prefetch case. Does it still make sense to distinguish boolean delayUntilEnd parameter in a concatMap? 🤔

 	 * @param delayUntilEnd delay error until all sources have been consumed instead of
 	 * after the current source

Looking at the behavior this currently seem to influence only the prefetching case, and only in terms of discarding prefetched upstream elements.

At first I thought I had a pretty straightforward fix for the issue you expose, but it brings up differences in behavior above, which are flagged by abstract shared tests. My current train of thought is that said behavior is not well-defined enough, and could maybe be harmonized in the upcoming 3.5.0.

simonbasle avatar Feb 14 '22 15:02 simonbasle

This change in 3.5.0 makes this issue even more apparent, as now the default prefetch became 0. So right now concatMapDelayError without additional arguments seems equal to a regular concatMap, which surely cannot be intended?

Venorcis avatar Aug 23 '23 14:08 Venorcis