pipetools icon indicating copy to clipboard operation
pipetools copied to clipboard

Does pipetools support async functions?

Open nicksspirit opened this issue 5 years ago • 8 comments

nicksspirit avatar Dec 11 '19 19:12 nicksspirit

Right now it doesn't do anything special for async functions. I suppose you'd want something like:

result = some_input > async_func1 | async_func2 | regular_func 

To produce the equivalent of:

async def f():
   return regular_func(await async_func2(await async_func1(some_input)))
result = f()

Or do you have some other use cases?

The above should probably be doable without breaking any existing functionality.

0101 avatar Dec 12 '19 17:12 0101

What you have there is a great start, I would love to help you implement it but I will probably need guidance.

Do async functions have a dunder property to check if it's async?

On Thu, Dec 12, 2019, 12:07 PM 0101 [email protected] wrote:

Right now it doesn't do anything special for async functions. I suppose you'd want something like:

result = some_input > async_func1 | async_func2 | regular_func

To produce the equivalent of:

async def f(): return regular_func(await async_func2(await async_func1(some_input))) result = f()

Or do you have some other use cases?

The above should probably be doable without breaking any existing functionality.

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/0101/pipetools/issues/9?email_source=notifications&email_token=ADMY5F6FCBYTAOOZE2LSUADQYJVWLA5CNFSM4JZTXBVKYY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOEGXLD4I#issuecomment-565096945, or unsubscribe https://github.com/notifications/unsubscribe-auth/ADMY5F6DKST4UCOSAUHDIKTQYJVWLANCNFSM4JZTXBVA .

nicksspirit avatar Dec 12 '19 17:12 nicksspirit

I don't really have much experience with Python's asyncio. From what I quickly tried, it seems that it would have to be deduced from the return value, which in case of async function is a coroutine object and it has __await__.

My initial guess is that Pipe.bind or .compose would need to be altered and include the detection and possibly instantiate a new type of AsyncPipe with some overrides.

Also maybe a bit more thinking is required on whether doing this automatically could prevent some legitimate use cases where someone would want to manipulate coroutines without awaiting them. In which case some explicit annotation would be better. (E.g. input > wait_for(async_func) | regular_func) or input > async_pipe | async_func | regular_func or something like that).

It'd be great if you helped with implementation - especially if you have some real-world use cases where the solution could be verified.

0101 avatar Dec 12 '19 18:12 0101

First - really love this project, thank you for making this!

Second - IMO the desired syntax would be something like (if possible):

result = some_input > await async_func1 | await async_func2 | regular_func 

leosussan avatar Feb 12 '20 17:02 leosussan

@leosussan I don't think that is possible because you'll have to call the function for it to be awaitable. Awaiting a function returns a coroutine. The tricky thing is that when function in the pipeline is awaited does the entire pipeline pause or only one of the functions.

so

result = some_input > await async_func1() | await async_func2() | regular_func 

I would think the former as piping is a very synchronous operation. Maybe under the hood we would want to take all the functions and then pass it to ~~asyncio.wait~~ asyncio.gather

This is an interesting problem, I have been thinking about this for a while now

nicksspirit avatar Feb 12 '20 18:02 nicksspirit

Maybe this package paco can provide some insight, in their docs they have this example

import paco

async def filterer(x):
    return x < 8

async def mapper(x):
    return x * 2

async def drop(x):
    return x < 10

async def reducer(acc, x):
    return acc + x

async def task(numbers):
    return await (numbers
                   | paco.filter(filterer)
                   | paco.map(mapper)
                   | paco.dropwhile(drop)
                   | paco.reduce(reducer, initializer=0))

# Run in event loop
number = paco.run(task((1, 2, 3, 4, 5, 6, 7, 8, 9, 10)))
print('Number:', number) # => 36

nicksspirit avatar Feb 12 '20 20:02 nicksspirit

Using the await keyword when constructing the pipe won't be possible, because arguments for those functions aren't available at that time.

If it isn't possible or desirable to automatically detect if a given function is async, then it would have to be wrapped in some helper function, so you'd get something like:

result = some_input > await_(async_func1) | await_(async_func2) | regular_func 

(And the result still has to be awaited or passed to some event loop to execute)

0101 avatar Feb 13 '20 11:02 0101

Use inspect.iscoroutinefunction(object) to detect if a function is async.

See: https://docs.python.org/3/library/inspect.html#inspect.iscoroutinefunction

leosussan avatar Feb 13 '20 19:02 leosussan