aiostream icon indicating copy to clipboard operation
aiostream copied to clipboard

Idea: Timeouts on chunks

Open danielloader opened this issue 3 years ago • 2 comments

Found this library and want to say thanks, it's absolutely brilliant for my usecase of merging multiple AWS Kinesis streams into a single stream of async records.

I noticed there's a chunk method I could pipe the merge into, which works well, until there's a slow down in the stream. Given these streams are unbounded potentially infinite - I'd like to propose a timeout on chunks (or take).

That way you'd be able to take a chunk of size n, or a timeout, whichever happens first and yields that (or skips yielding completely if the buffer is empty).

Thanks again for this library, the merge functionality is absolutely astounding for combining concurrent async generators into one usable stream!

danielloader avatar Oct 09 '22 19:10 danielloader

Had a fiddle and came up with:

@operator(pipable=True)
async def taketimeout(source, n, timeout=1):
    """Forward the first ``n`` elements from an asynchronous sequence.
    Additionally provide a ``timeout`` in seconds. Default is 1 second.

    If ``n`` is negative, it simply terminates before iterating the source.
    """
    event = asyncio.Event()
    async def timer(event: asyncio.Event, timeout):
        await asyncio.sleep(timeout)
        event.set()
    asyncio.create_task(timer(event, timeout))
    async for item in take(source, n):
        if event.is_set():
            break
        yield item

With this test:

@pytest.mark.asyncio
async def test_taketimeout(assert_run, event_loop):
    with event_loop.assert_cleanup():
        xs = stream.count(interval=1) | add_resource.pipe(1) | pipe.taketimeout(6, 3)
        await assert_run(xs, [0, 1, 2])

    with event_loop.assert_cleanup():
        xs = stream.count(1) | add_resource.pipe(1) | pipe.taketimeout(0)
        await assert_run(xs, [])

The good news is it passes the functionality test, given a take size of 6, but timeout of 3, it's pulling three items from the count generator.

The bad news is it's a bit janky, the Python 3.11 timeout context manager would be perfect for this, but obviously not applicable here. Additionally it passes the cleanup check failure and I'm not sure why.

danielloader avatar Oct 09 '22 21:10 danielloader

It might be more useful to make a timeout function a generic pipe operator that allows you to terminate any long running pipe by time rather than by item counts.

E.g.

import asyncio
from aiostream import stream, pipe


async def main():
    xs = (
        stream.count(interval=1)  # Count from zero every 1s from an unbound infinite generator
        | pipe.timeout(10) # Close the pipeline after 10 seconds yielding [0,1,2,3,4,5,6,7,8,9]
        | pipe.skip(2)  # Skip the first 2 numbers yielding [2,3,4,5,6,7,8,9]
        | pipe.take(5)  # Take the following 2 yielding [2,3,4,5,6]
        | pipe.filter(lambda x: x % 2)  # Keep odd numbers yielding [3,5]
        | pipe.map(lambda x: x ** 2)  # Square the results yielding [9,25]
        | pipe.accumulate()  # Add the numbers together yielding 34
    )
    print(await xs)


# Run main coroutine
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

danielloader avatar Oct 09 '22 21:10 danielloader

Hi @danielloader, sorry for the delay

Found this library and want to say thanks, it's absolutely brilliant for my usecase of merging multiple AWS Kinesis streams into a single stream of async records.

Thanks for your kind words, it's much appreciated :)

I'd like to propose a timeout on chunks (or take).

That's an interesting suggestion ! It's quite tricky to implement though, after a bit fiddling around I came up with this solution:

import random
from contextlib import asynccontextmanager

import asyncio
from aiostream import stream, pipe, operator, streamcontext


@asynccontextmanager
async def buffer(streamer, size=1):
    queue = asyncio.Queue(maxsize=size)
    sentinel = object()

    async def consume():
        try:
            async for item in streamer:
                await queue.put(item)
        finally:
            await queue.put(sentinel)

    @operator
    async def wrapper():
        while True:
            item = await queue.get()
            if item is sentinel:
                await future
                return
            yield item

    future = asyncio.ensure_future(consume())
    try:
        yield wrapper()
    finally:
        future.cancel()


@operator(pipable=True)
async def catch(source, exc_cls):
    async with streamcontext(source) as streamer:
        try:
            async for item in streamer:
                yield item
        except exc_cls:
            return


@operator(pipable=True)
async def chunks(source, n, timeout):
    async with streamcontext(source) as streamer:
        async with buffer(streamer) as buffered:
            async with streamcontext(buffered) as first_streamer:
                async for first in first_streamer:
                    tail = await (
                        buffered
                        | pipe.timeout(timeout)
                        | catch.pipe(asyncio.TimeoutError)
                        | pipe.take(n - 1)
                        | pipe.list()
                    )
                    yield [first, *tail]


async def random_sleep(item):
    return await asyncio.sleep(random.random() * 0.15, result=item)


async def main():
    xs = stream.count() | pipe.map(random_sleep, task_limit=1)
    ys = xs | chunks.pipe(5, timeout=0.1) | pipe.print()
    await ys


if __name__ == "__main__":
    asyncio.run(main())

It outputs something like:

[0, 1]
[2, 3, 4, 5]
[6, 7, 8, 9]
[10, 11, 12, 13, 14]
[15, 16, 17]
[18, 19, 20, 21]
[22, 23, 24, 25]
[26, 27, 28]
[29, 30]
[31, 32, 33, 34, 35]
[36, 37]
[38, 39, 40, 41]
[42, 43]
[44, 45]
[46, 47, 48, 49, 50]
...

As you can see, I needed two extra helpers in order to write this new chunks implementation:

  • a catch operator, which is simple enough
  • a buffer context manager, which is something much trickier

Also note that this implementation yields a smaller chunk when the production of a single item times out, not when the production of the whole chunk times out.

I'm not sure how I would go about implementing such a feature. Should we add this logic specifically for chunks or should we take a more generic approach and design new operators to use as building blocks for similar use cases? Sadly I don't really have bandwidth to work on aiostream at the moment, but feel free to add a couple suggestions if you have some.

And thanks for the report :)

vxgmichel avatar Nov 07 '22 13:11 vxgmichel

@vxgmichel The process does not stop until it is manually killed after the chunks operator is used.

async def main():
    source=stream.range(0,100)
    even=source|pipe.filter(lambda x:x%2==0)
    xs=even|chunks.pipe(9,0.1)|pipe.map(lambda x:f'even {x}',task_limit=1)
    await (xs|pipe.print())
if __name__ == "__main__":
    asyncio.run(main())

zzl221000 avatar Jan 11 '23 00:01 zzl221000