reactor-core icon indicating copy to clipboard operation
reactor-core copied to clipboard

Sinks.many replay limit not respected

Open GoncaloPT opened this issue 1 year ago • 1 comments

Max-age limit is one of the available features offered by MulticastReplaySpec. Events are supposed to be expired after a ttl/maxAge, and therefore not fed downstream when new subscribers join.

From my observation, this contract is only respected if events are continuously added to the Sink, otherwise, expired values are kept forever ( or until the next event is added to the sink ), and - therefore - new subscribers are presented with expired events.

Expected Behavior

Expired events should not be presented to subscribers, even if sink is in "idle", meaning, no new events have been added to it.

Actual Behavior

Expired events are not removed, and new subscribers can see them.

Steps to Reproduce

package pt.goncalo.gitissues.reactor

import org.junit.jupiter.api.Test
import reactor.core.publisher.Sinks
import reactor.test.StepVerifier
import java.time.Duration
import java.util.stream.IntStream

class ReactorReplayTest {


    /**
     * This test is failing, because MulticastReplaySpec seems to only clear expired items after an operation
     * (like next) is called
     *
     */
    @Test
    fun `it should receive no event if sent before maxAge gap - even without add`() {
        val maxAge = Duration.ofSeconds(1)
        val instance = Sinks.many().replay().limit<String>(maxAge)

        IntStream
            .range(0, 100)
            .mapToObj { "$it" }
            .forEach {
                instance.tryEmitNext(it)
            }
        // Sleep for maxAge / TTL
        Thread.sleep(maxAge.toMillis())
        instance.tryEmitComplete()
        StepVerifier
            .create(instance.asFlux())
            .expectComplete()
            .verify(Duration.ofSeconds(1))


    }

    /**
     * This test passes ( as expected ) but only because next was called before the completion of the publisher.
     */
    @Test
    fun `it should receive no event if sent before maxAge gap`() {
        val maxAge = Duration.ofSeconds(1)
        val instance = Sinks.many().replay().limit<String>(maxAge)
        IntStream
            .range(0, 100)
            .mapToObj { "$it" }
            .forEach {
                instance.tryEmitNext(it)
            }
        Thread.sleep(maxAge.toMillis())

        // This shouldn't be required.. but if i emit this value, previous values (expired ) are correctly removed
        instance.tryEmitNext("lastOne")
        instance.tryEmitComplete()
        StepVerifier
            .create(instance.asFlux())
            .expectNext("lastOne")
            .expectComplete()
            .verify(Duration.ofSeconds(1))
    }
}

A complete and running test case can be found at this repository The test class is HERE

Your Environment

The project built using start.string.io, using Gradle - Kotlin, spring boot 3.0.1. The problem also happening using Java - Maven, spring boot 2.7.3

  • Reactor version(s) used: reactor-core 3.5.1 or reactor-core 3.4.22

  • JVM version (java -version): 17

  • OS and version (eg uname -a): Darwin 21.6.0 Darwin Kernel Version 21.6.0

Thanks.

GoncaloPT avatar Dec 28 '22 15:12 GoncaloPT