aiometer icon indicating copy to clipboard operation
aiometer copied to clipboard

Allow AsyncGenerator or AsyncIterator for "args"

Open Ikatono opened this issue 1 year ago • 8 comments

It would be useful to be able start some tasks before all of the arguments are available. This appears to be supported in trimeter, although I haven't tested since the last commit was 4 years ago.

As an example use case, I'm currently trying to consume a paginated API. I have an asynchronous generator that yields the items from a page while pre-loading the next page. As aiometer is right now, (I believe) I need to either generate the whole list of items at first (which requires downloading each page sequentially until I get an empty page), or yield chunks of results and call aiometer.amap on each chunk. It would be cleaner if I could pass the AsyncGenerator directly to amap and have it await items as needed.

Ikatono avatar May 04 '24 07:05 Ikatono

Hi @Ikatono

This sounds interesting but I'm having a hard time seeing why wou'd need aiometer to iterate over that async iterator?

If you could come up with some example code you'd like to be able to write, or how you would write it with trimeter, that would help clarifying, I think.

florimondmanca avatar May 04 '24 12:05 florimondmanca

Sure. So here's what my current code looks like:

def async_chunk_generator():
    pass

async for chunk in async_chunk_generator():
    await aiometer.run_on_each(action, chunk, max_at_once=mao, max_per_second=mps)

With trimeter I should be able to write this instead:

#yields individual items, not chunks
def async_generator():
    pass

await trimeter.run_on_each(action, async_generator(), max_at_once=mao, max_per_second=mps)

Besides saving me from having to chunk my generator, the second keep max_at_once and max_per_second consistent through the entire process. In the first example, at the end of each chunk I need every task to finish, even when there are fewer than max_at_once remaining.

Looking at trimeter, they convert non-async iterators to async then work through them in an async for loop.

        async with trio.open_nursery() as nursery:
            index = 0
            async for value in iterable:
                for meter_state in meter_states:
                    await meter_state.wait_task_can_start()
                for meter_state in meter_states:
                    meter_state.notify_task_started()
                nursery.start_soon(_worker, async_fn, value, index, config)
                index += 1

The equivalent section in aiometer is:

    async with anyio.create_task_group() as task_group:
        for index, value in enumerate(args):
            for state in meter_states:
                await state.wait_task_can_start()

            for state in meter_states:
                await state.notify_task_started()

            task_group.start_soon(_worker, async_fn, index, value, config)

so maybe something along the lines of:

    async with anyio.create_task_group() as task_group:
        index = 0
        async for value in async_iterable:
            for state in meter_states:
                await state.wait_task_can_start()

            for state in meter_states:
                await state.notify_task_started()

            task_group.start_soon(_worker, async_fn, index, value, config)
            index += 1

Ikatono avatar May 04 '24 12:05 Ikatono

Oops, closing was a misclick.

Right! Thanks so much for the code samples and investigation.

So now it's clearer what needs to be done -- the list argument should be expanded to accept async iterables. "Sync" arguments (list, iterable, etc) should be wrapped in an async iterabke so we can reprogram amap against an async iterabke always.

If you're up for giving this a bite I'd be happy to review a PR -even partial- towards this.

florimondmanca avatar May 04 '24 13:05 florimondmanca

Sure! So the change needs to be applied to amap.py, run_all.py, run_any.py, and run_on_each.py, then a new set of test cases to cover async iterators, right? Am I missing anything?

Ikatono avatar May 04 '24 13:05 Ikatono

In short, yes!

Perhaps an update of the API reference as well, or some example of using an async iterator.

florimondmanca avatar May 04 '24 13:05 florimondmanca

Hi @florimondmanca ,

Can you confirm whether the tests pass for you on 0.5.0? About half the tests are failing for me on WSL, seems like a dependency issue I'm having a hard time tracking down.

Ikatono avatar May 07 '24 04:05 Ikatono

It's very possible that there's a dependency issue, this happens from time to time as transient dependencies may induce some breakage.

If you open up a PR against the current state of your code we'll be able to see how that looks like in CI.

florimondmanca avatar May 07 '24 18:05 florimondmanca

I pulled the repo on a linux machine this time and the tests pass without issue.

Ikatono avatar May 11 '24 04:05 Ikatono