RxPY
RxPY copied to clipboard
reactivex.timer emits immediately on second subscription
An observable created by reactivex.timer(1,5)
will emit immediately when a second observer subscribes instead of waiting the "1" second specified as argument. This only happens when an interval value is specified
def test_timer_with_interval_repeat():
scheduler = TestScheduler()
source = reactivex.timer(10, 200, scheduler=scheduler)
result = scheduler.start(
lambda: source.pipe(
operators.take(2),
operators.repeat(),
)
)
assert result.messages == [
on_next(210, 0),
on_next(410, 1),
on_next(420, 0), # <- here it will actually emit at 410
on_next(620, 1), # then 610
on_next(630, 0), # and again at 610, missing the "10" ticks from the timer
on_next(830, 1), # etc...
on_next(840, 0),
]
Expected the timer to be "reset" regardless if we resub on repeat or with a second observer
Additional findings:
- it only happens when there is an interval value; non repeating timers don't have the issue
- using a
operators.delay
we see that the next intervals follow the second subscription, meaning, they are correct relative to the first emission but wrong relative to expected behavior -
reactivex.interval
does not seem to have this issue
- RxPY 4.0.4
- Python 3.10
Another finding: in the case of duetime == period
which is handled separately in the code (I wonder what the reason is?), the issue does not occur