aiostream
aiostream copied to clipboard
Idea: Timeouts on chunks
Found this library and want to say thanks, it's absolutely brilliant for my usecase of merging multiple AWS Kinesis streams into a single stream of async records.
I noticed there's a chunk method I could pipe the merge into, which works well, until there's a slow down in the stream. Given these streams are unbounded potentially infinite - I'd like to propose a timeout on chunks (or take).
That way you'd be able to take a chunk of size n, or a timeout, whichever happens first and yields that (or skips yielding completely if the buffer is empty).
Thanks again for this library, the merge functionality is absolutely astounding for combining concurrent async generators into one usable stream!
Had a fiddle and came up with:
@operator(pipable=True)
async def taketimeout(source, n, timeout=1):
"""Forward the first ``n`` elements from an asynchronous sequence.
Additionally provide a ``timeout`` in seconds. Default is 1 second.
If ``n`` is negative, it simply terminates before iterating the source.
"""
event = asyncio.Event()
async def timer(event: asyncio.Event, timeout):
await asyncio.sleep(timeout)
event.set()
asyncio.create_task(timer(event, timeout))
async for item in take(source, n):
if event.is_set():
break
yield item
With this test:
@pytest.mark.asyncio
async def test_taketimeout(assert_run, event_loop):
with event_loop.assert_cleanup():
xs = stream.count(interval=1) | add_resource.pipe(1) | pipe.taketimeout(6, 3)
await assert_run(xs, [0, 1, 2])
with event_loop.assert_cleanup():
xs = stream.count(1) | add_resource.pipe(1) | pipe.taketimeout(0)
await assert_run(xs, [])
The good news is it passes the functionality test, given a take size of 6, but timeout of 3, it's pulling three items from the count generator.
The bad news is it's a bit janky, the Python 3.11 timeout context manager would be perfect for this, but obviously not applicable here. Additionally it passes the cleanup check failure and I'm not sure why.
It might be more useful to make a timeout function a generic pipe operator that allows you to terminate any long running pipe by time rather than by item counts.
E.g.
import asyncio
from aiostream import stream, pipe
async def main():
xs = (
stream.count(interval=1) # Count from zero every 1s from an unbound infinite generator
| pipe.timeout(10) # Close the pipeline after 10 seconds yielding [0,1,2,3,4,5,6,7,8,9]
| pipe.skip(2) # Skip the first 2 numbers yielding [2,3,4,5,6,7,8,9]
| pipe.take(5) # Take the following 2 yielding [2,3,4,5,6]
| pipe.filter(lambda x: x % 2) # Keep odd numbers yielding [3,5]
| pipe.map(lambda x: x ** 2) # Square the results yielding [9,25]
| pipe.accumulate() # Add the numbers together yielding 34
)
print(await xs)
# Run main coroutine
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
Hi @danielloader, sorry for the delay
Found this library and want to say thanks, it's absolutely brilliant for my usecase of merging multiple AWS Kinesis streams into a single stream of async records.
Thanks for your kind words, it's much appreciated :)
I'd like to propose a timeout on chunks (or take).
That's an interesting suggestion ! It's quite tricky to implement though, after a bit fiddling around I came up with this solution:
import random
from contextlib import asynccontextmanager
import asyncio
from aiostream import stream, pipe, operator, streamcontext
@asynccontextmanager
async def buffer(streamer, size=1):
queue = asyncio.Queue(maxsize=size)
sentinel = object()
async def consume():
try:
async for item in streamer:
await queue.put(item)
finally:
await queue.put(sentinel)
@operator
async def wrapper():
while True:
item = await queue.get()
if item is sentinel:
await future
return
yield item
future = asyncio.ensure_future(consume())
try:
yield wrapper()
finally:
future.cancel()
@operator(pipable=True)
async def catch(source, exc_cls):
async with streamcontext(source) as streamer:
try:
async for item in streamer:
yield item
except exc_cls:
return
@operator(pipable=True)
async def chunks(source, n, timeout):
async with streamcontext(source) as streamer:
async with buffer(streamer) as buffered:
async with streamcontext(buffered) as first_streamer:
async for first in first_streamer:
tail = await (
buffered
| pipe.timeout(timeout)
| catch.pipe(asyncio.TimeoutError)
| pipe.take(n - 1)
| pipe.list()
)
yield [first, *tail]
async def random_sleep(item):
return await asyncio.sleep(random.random() * 0.15, result=item)
async def main():
xs = stream.count() | pipe.map(random_sleep, task_limit=1)
ys = xs | chunks.pipe(5, timeout=0.1) | pipe.print()
await ys
if __name__ == "__main__":
asyncio.run(main())
It outputs something like:
[0, 1]
[2, 3, 4, 5]
[6, 7, 8, 9]
[10, 11, 12, 13, 14]
[15, 16, 17]
[18, 19, 20, 21]
[22, 23, 24, 25]
[26, 27, 28]
[29, 30]
[31, 32, 33, 34, 35]
[36, 37]
[38, 39, 40, 41]
[42, 43]
[44, 45]
[46, 47, 48, 49, 50]
...
As you can see, I needed two extra helpers in order to write this new chunks implementation:
- a
catchoperator, which is simple enough - a
buffercontext manager, which is something much trickier
Also note that this implementation yields a smaller chunk when the production of a single item times out, not when the production of the whole chunk times out.
I'm not sure how I would go about implementing such a feature. Should we add this logic specifically for chunks or should we take a more generic approach and design new operators to use as building blocks for similar use cases? Sadly I don't really have bandwidth to work on aiostream at the moment, but feel free to add a couple suggestions if you have some.
And thanks for the report :)
@vxgmichel The process does not stop until it is manually killed after the chunks operator is used.
async def main():
source=stream.range(0,100)
even=source|pipe.filter(lambda x:x%2==0)
xs=even|chunks.pipe(9,0.1)|pipe.map(lambda x:f'even {x}',task_limit=1)
await (xs|pipe.print())
if __name__ == "__main__":
asyncio.run(main())