RxPY icon indicating copy to clipboard operation
RxPY copied to clipboard

reactivex.timer emits immediately on second subscription

Open matiboy opened this issue 1 year ago • 1 comments

An observable created by reactivex.timer(1,5) will emit immediately when a second observer subscribes instead of waiting the "1" second specified as argument. This only happens when an interval value is specified

def test_timer_with_interval_repeat():
    scheduler = TestScheduler()
    source = reactivex.timer(10, 200, scheduler=scheduler)
    result = scheduler.start(
        lambda: source.pipe(
            operators.take(2),
            operators.repeat(),
        )
    )
    assert result.messages == [
        on_next(210, 0),
        on_next(410, 1),
        on_next(420, 0), # <- here it will actually emit at 410
        on_next(620, 1), # then 610
        on_next(630, 0), # and again at 610, missing the "10" ticks from the timer
        on_next(830, 1), # etc...
        on_next(840, 0),
    ]

Expected the timer to be "reset" regardless if we resub on repeat or with a second observer

Additional findings:

  1. it only happens when there is an interval value; non repeating timers don't have the issue
  2. using a operators.delay we see that the next intervals follow the second subscription, meaning, they are correct relative to the first emission but wrong relative to expected behavior
  3. reactivex.interval does not seem to have this issue
  • RxPY 4.0.4
  • Python 3.10

matiboy avatar Sep 05 '23 02:09 matiboy

Another finding: in the case of duetime == period which is handled separately in the code (I wonder what the reason is?), the issue does not occur

matiboy avatar Sep 05 '23 02:09 matiboy