autoConnect(0) seems to be broken - late subscribers receive data
autoConnect(0) seems to be broken completely.
Steps to replicate:
CountDownLatch latch = new CountDownLatch(1);
AtomicInteger atomicInteger = new AtomicInteger(0);
Flux<Integer> flux = Flux.generate(sink -> sink.next(atomicInteger.incrementAndGet()))
.delayElements(Duration.ofSeconds(1))
.take(7)
.cast(Integer.class)
.doOnComplete(() -> System.out.println("completed"))
.publish()
.autoConnect(0);
// no subscribers for 3 seconds
Thread.sleep(3000);
// first subscriber: this should print 3,4,5
flux.take(3).subscribe(System.out::println);
// no subscribers for 6 seconds. by this time source would have completed
Thread.sleep(6000);
// second subscriber: this should NOT print any data! but it prints 6 and 7
flux.take(3).subscribe(System.out::println);
latch.await();
Expected Behavior:
First subscribers get fresh messages emitted by the source. This behavior should be same for late subscribers as well. That is - if the source is already completed, late subscribers should not receive any data.
Actual Behavior. the late subscribers get all the messages dropped by the first subscriber. This behavior does not make any sense.
Tested with latest version as well - https://mvnrepository.com/artifact/io.projectreactor/reactor-bom/2023.0.5
Thanks for the report. It somehow wasn't noticed, apologies. This I believe is a known issue with late subscribers. I'll try to summarize some similar reports and try to keep them under one umbrella. AFAIR there is no easy, backwards compatible fix, unfortunately, but I'll try to gather the notes.