reactor-core
reactor-core copied to clipboard
Sinks.many replay limit not respected
Max-age limit is one of the available features offered by MulticastReplaySpec. Events are supposed to be expired after a ttl/maxAge, and therefore not fed downstream when new subscribers join.
From my observation, this contract is only respected if events are continuously added to the Sink, otherwise, expired values are kept forever ( or until the next event is added to the sink ), and - therefore - new subscribers are presented with expired events.
Expected Behavior
Expired events should not be presented to subscribers, even if sink is in "idle", meaning, no new events have been added to it.
Actual Behavior
Expired events are not removed, and new subscribers can see them.
Steps to Reproduce
package pt.goncalo.gitissues.reactor
import org.junit.jupiter.api.Test
import reactor.core.publisher.Sinks
import reactor.test.StepVerifier
import java.time.Duration
import java.util.stream.IntStream
class ReactorReplayTest {
/**
* This test is failing, because MulticastReplaySpec seems to only clear expired items after an operation
* (like next) is called
*
*/
@Test
fun `it should receive no event if sent before maxAge gap - even without add`() {
val maxAge = Duration.ofSeconds(1)
val instance = Sinks.many().replay().limit<String>(maxAge)
IntStream
.range(0, 100)
.mapToObj { "$it" }
.forEach {
instance.tryEmitNext(it)
}
// Sleep for maxAge / TTL
Thread.sleep(maxAge.toMillis())
instance.tryEmitComplete()
StepVerifier
.create(instance.asFlux())
.expectComplete()
.verify(Duration.ofSeconds(1))
}
/**
* This test passes ( as expected ) but only because next was called before the completion of the publisher.
*/
@Test
fun `it should receive no event if sent before maxAge gap`() {
val maxAge = Duration.ofSeconds(1)
val instance = Sinks.many().replay().limit<String>(maxAge)
IntStream
.range(0, 100)
.mapToObj { "$it" }
.forEach {
instance.tryEmitNext(it)
}
Thread.sleep(maxAge.toMillis())
// This shouldn't be required.. but if i emit this value, previous values (expired ) are correctly removed
instance.tryEmitNext("lastOne")
instance.tryEmitComplete()
StepVerifier
.create(instance.asFlux())
.expectNext("lastOne")
.expectComplete()
.verify(Duration.ofSeconds(1))
}
}
A complete and running test case can be found at this repository The test class is HERE
Your Environment
The project built using start.string.io, using Gradle - Kotlin, spring boot 3.0.1. The problem also happening using Java - Maven, spring boot 2.7.3
-
Reactor version(s) used: reactor-core 3.5.1 or reactor-core 3.4.22
-
JVM version (
java -version
): 17 -
OS and version (eg
uname -a
): Darwin 21.6.0 Darwin Kernel Version 21.6.0
Thanks.