aioreactive
aioreactive copied to clipboard
rx.delay frequently destroys tasks
Hi, i'm unsure if this is the intended behaviour, but rx.delay causes asyncio to frequently throw the following error.
if i'm not mistaken, rx.delay(x: seconds) is supposed to be an operator that delays the parent for x seconds.
this is how i think rx.delay should behave:
def setup():
return pipe(
rx.interval(0, 2),
rx.flat_map(many),
rx.subscribe_async(observer),
)
async def observer(x):
print(x)
async def delayed(x, y):
await asyncio.sleep(y * 0.1)
return (x, y)
def many(x: int):
return rx.merge_seq(
[
pipe(
delayed(x, 1),
rx.from_async,
),
pipe(
delayed(x, 2),
rx.from_async,
),
]
)
however, when we use rx.delay instead of calling asyncio.sleep
async def delayed(x, y):
return (x, y)
def many(x: int):
return rx.merge_seq(
[
pipe(
delayed(x, 1),
rx.from_async,
rx.delay(0.1),
),
pipe(
delayed(x, 2),
rx.from_async,
rx.delay(0.2),
),
]
)
the exceptions are thrown
(0, 1)
(0, 2)
ERROR:asyncio:Task was destroyed but it is pending!
task: <Task pending name='Task-17' coro=<delay.<locals>.subscribe_async.<locals>.worker.<locals>.loop() done, defined at /Users/shawnkoh/repos/ninjacado/.venv/lib/python3.10/site-packages/expression/core/fn.py:59> wait_for=<Future pending cb=[Task.task_wakeup()]>>
ERROR:asyncio:Task was destroyed but it is pending!
task: <Task pending name='Task-18' coro=<delay.<locals>.subscribe_async.<locals>.worker.<locals>.loop() done, defined at /Users/shawnkoh/repos/ninjacado/.venv/lib/python3.10/site-packages/expression/core/fn.py:59> wait_for=<Future pending cb=[Task.task_wakeup()]>>
(1, 1)
(1, 2)
(2, 1)
(2, 2)
(3, 1)
(3, 2)
ERROR:asyncio:Task was destroyed but it is pending!
task: <Task pending name='Task-43' coro=<delay.<locals>.subscribe_async.<locals>.worker.<locals>.loop() done, defined at /Users/shawnkoh/repos/ninjacado/.venv/lib/python3.10/site-packages/expression/core/fn.py:59> wait_for=<Future pending cb=[Task.task_wakeup()]>>
ERROR:asyncio:Task was destroyed but it is pending!
task: <Task pending name='Task-44' coro=<delay.<locals>.subscribe_async.<locals>.worker.<locals>.loop() done, defined at /Users/shawnkoh/repos/ninjacado/.venv/lib/python3.10/site-packages/expression/core/fn.py:59> wait_for=<Future pending cb=[Task.task_wakeup()]>>
ERROR:asyncio:Task was destroyed but it is pending!
task: <Task pending name='Task-56' coro=<delay.<locals>.subscribe_async.<locals>.worker.<locals>.loop() done, defined at /Users/shawnkoh/repos/ninjacado/.venv/lib/python3.10/site-packages/expression/core/fn.py:59> wait_for=<Future pending cb=[Task.task_wakeup()]>>
ERROR:asyncio:Task was destroyed but it is pending!
task: <Task pending name='Task-57' coro=<delay.<locals>.subscribe_async.<locals>.worker.<locals>.loop() done, defined at /Users/shawnkoh/repos/ninjacado/.venv/lib/python3.10/site-packages/expression/core/fn.py:59> wait_for=<Future pending cb=[Task.task_wakeup()]>>
(4, 1)
(4, 2)
A workaround i've been using is this function
@curry_flipped(1)
def sleep(
source: AsyncObservable[_TSource],
seconds: float,
sleep_on_start: bool = False,
) -> AsyncObservable[_TSource]:
started = sleep_on_start
@curry_flipped(1)
async def _sleep(source, seconds: float):
nonlocal started
if started:
await asyncio.sleep(seconds)
else:
started = True
return source
return pipe(
source,
rx.map_async(_sleep(seconds)),
)
example:
def setup(token: str):
session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=None))
return pipe(
rx.interval(0, 2),
rx.flat_map(
lambda _: pipe(
range(14),
rx.from_iterable,
rx_helpers.sleep(0.1),
rx.map(lambda y: arrow.now().shift(days=y)),
rx.map_async(
schedules(token, session),
),
)
),
rx.subscribe_async(rx.AsyncAnonymousObserver()),
)