#publish does not propagate onComplete to late Subscribers
Expected Behavior
When a #published source completes while no one is subscribed, at least the first Subscriber to subscribe afterwards directly receives onComplete.
Actual Behavior
The subscriber receives no signal at all and effectively hangs.
Steps to Reproduce
@Test
void subscribeAfterComplete() {
var sink = Sinks.many().unicast().<Integer>onBackpressureBuffer();
var published = sink.asFlux().log("source").publish().autoConnect().log("published");
sink.tryEmitNext(1).orThrow();
StepVerifier.create(published)
.expectNext(1)
.thenCancel()
.verify(Duration.ofSeconds(1));
sink.tryEmitComplete().orThrow();
StepVerifier.create(published)
.expectComplete()
.verify(Duration.ofSeconds(1));
}
Output:
2022-01-20 18:00:05.376 INFO --- [Pool-1-worker-3] published : onSubscribe(FluxPublish.PublishInner)
2022-01-20 18:00:05.386 INFO --- [Pool-1-worker-3] published : request(unbounded)
2022-01-20 18:00:05.388 INFO --- [Pool-1-worker-3] source : | onSubscribe([Fuseable] UnicastProcessor)
2022-01-20 18:00:05.392 INFO --- [Pool-1-worker-3] source : | request(256)
2022-01-20 18:00:05.392 INFO --- [Pool-1-worker-3] source : | onNext(1)
2022-01-20 18:00:05.392 INFO --- [Pool-1-worker-3] published : onNext(1)
2022-01-20 18:00:05.393 INFO --- [Pool-1-worker-3] published : cancel()
2022-01-20 18:00:05.394 INFO --- [Pool-1-worker-3] source : | request(1)
2022-01-20 18:00:05.395 INFO --- [Pool-1-worker-3] source : | onComplete()
2022-01-20 18:00:05.396 INFO --- [Pool-1-worker-3] published : onSubscribe(FluxPublish.PublishInner)
2022-01-20 18:00:05.397 INFO --- [Pool-1-worker-3] published : request(unbounded)
java.lang.AssertionError: VerifySubscriber timed out on reactor.core.publisher.FluxPeek$PeekSubscriber@58f5f822
Workaround
Use #materialize before and #dematerialize after publishing:
var published = sink.asFlux().log("source").materialize().publish().autoConnect().<Integer>dematerialize().log("published");
Your Environment
- Reactor version(s) used: 3.4.14
- Other relevant libraries versions (eg.
netty, ...): none - JVM version (
java -version):openjdk version "11.0.11" 2021-04-20 - OS and version (eg
uname -a):Linux Lovelace 5.13.0-27-generic #29-Ubuntu SMP Wed Jan 12 17:36:47 UTC 2022 x86_64 x86_64 x86_64 GNU/Linux
To elaborate on my expectation:
I am using #publish with a source that honours backpressure. I am relying on the documented property of #publish that
if any Subscriber is missing demand (requested = 0), multicast will pause pushing/pulling.
I take that to mean that if there are no Subscribers at all (and, hence, no request), pushing and pulling will also pause. This works as expected for next signals, but not for completion. I think it is because of the prefetch: prefetched next signals are buffered and replayed, but completion signals are not.
My expectation is that at least the first subscriber that subscribes after the source completed sees the completion. I would find it even more useful if all later subscribers got the completion replayed, but I guess one could also make the argument that this would be inconsistent with how #publish handles next signals.
Here is another example that shows why I find the current behaviour unexpected: The completion signal is being buffered while there are no subscribers, as long as there are still outstanding next signals.
We can see this in this succeeding test case:
@Test
void subscribeAfterComplete() {
var sink = Sinks.many().unicast().<Integer>onBackpressureBuffer();
var published = sink.asFlux().log("source").publish().autoConnect().log("published");
sink.tryEmitNext(1).orThrow();
StepVerifier.create(published)
.expectNext(1)
.thenCancel()
.verify(Duration.ofSeconds(1));
sink.tryEmitNext(2).orThrow();
sink.tryEmitComplete().orThrow();
StepVerifier.create(published)
.expectNext(2)
.expectComplete()
.verify(Duration.ofSeconds(1));
}
In other words: #publish makes sure that no signals are lost while there are no Subscribers, except if the only outstanding signal is a completion. In that case it is dropped.
This indeed sounds like something that needs fixing. Thanks for adding that other test case, we'll look into it 👍
@jGleitz there is a workaround as of now which is replacement of publish with replay(history=0).
In the duplicate issue #3739 the published flux gladly accepts the 2nd late subscriber then ignores it and the pipeline hangs. This behavior makes no sense to me. It is also not documented anywhere, AFAICT.
Stating the obvious that causing a pipeline to hang, possibly in a subtle way, should be avoided.
IMHO, it should simply give it a complete signal, instead of hanging.