async-std icon indicating copy to clipboard operation
async-std copied to clipboard

async combinators for Stream

Open yoshuawuyts opened this issue 6 years ago • 14 comments

Our stream combinators operate on asynchronous data, but are themselves synchronous.

This means it's not possible to for example filter or map a stream based on other async operations. Which seems limiting, as it's possible to call sync functions from async functions but not the other way around.

I'd like to propose we update our Stream combinators that take closures to be async functions.

cc/ @montekki ref #129

yoshuawuyts avatar Sep 27 '19 01:09 yoshuawuyts

How would this be represented in the type system currently? Can you write an example for the map combinator to illustrate? I believe this is blocked on traits AsyncFn, AsyncFnOnce, and AsyncFnMut.

Still, I believe combinators with async closures should be representable with the stream! syntax. For example, the equivalent of s.map(async |x| x.await) could be written as:

let s = stream! {
    for x in s {
        yield x.await;
    }
};

ghost avatar Sep 27 '19 03:09 ghost

So we currently don't have map, but futures::Stream::map is defined as:

// Definition
fn map<T, F>(self, f: F) -> Map<Self, F> where
    F: FnMut(Self::Item) -> T;

// Usage
stream.map(|i| i * 2);

However this is problematic when say we want to call an async fn from within the map. For example doing a database lookup for each key in a stream. For that the map combinator would need to be defined as:

// Definition
fn map<T, F, R>(self, f: F) -> Map<Self, F> where
    R: Future<Output = T>,
    F: FnMut(Self::Item) -> R;

// Usage
stream.map(async |i| db.lookup(i).await);

You're right in pointing out we don't have async closures yet. But we can somewhat pretend we do as this is valid on Rust 1.39:

stream.map(|i| async { db.lookup(i).await });

However ideally we could have an actual AsyncFn trait as part of stdlib, and an automatic mapping going from Fn to AsyncFn so it's treated as a strict superset of the regular closures. But that's a bit further out in the future, and I think we'll be able to incorporate that in a backwards-compatible way. Here's a playground example of a similar trait definition.

yoshuawuyts avatar Sep 27 '19 10:09 yoshuawuyts

So we currently don't have map, but futures::Stream::map is defined as:

Many combinators of futures_util::stream other than Map use FnMut(T) -> impl Future closures. https://docs.rs/futures-preview/0.3.0-alpha.19/futures/stream/trait.StreamExt.html#method.filter_map

taiki-e avatar Sep 27 '19 11:09 taiki-e

@taiki-e Why do filter and filter_map use async closures but map doesn't? Was that intentional or is that just an oversight?

ghost avatar Sep 27 '19 11:09 ghost

@stjepang Maybe it ’s an oversight. I will file an issue.

UPDATE: filed https://github.com/rust-lang-nursery/futures-rs/issues/1889.

taiki-e avatar Sep 27 '19 13:09 taiki-e

@yoshuawuyts assign this to me please

montekki avatar Sep 27 '19 14:09 montekki

@taiki-e is this by design that next() is not usable with those combinators? So this will not work:

let s = stream::iter(1u8..10);
let mut s = s.filter_map(|a| async move { Some(a) });

while let Some(n) = s.next().await {
      println!("next {}", n);
}

since next() is bound by Unpin.

montekki avatar Sep 27 '19 16:09 montekki

@montekki You need to use pin_utils::pin_mut (or Box::pin) (Pin<impl Deref> always implements Unpin.)

let s = stream::iter(1u8..10);
let s = s.filter_map(|a| async move { Some(a) });
pin_utils::pin_mut!(s);

while let Some(n) = s.next().await {
      println!("next {}", n);
}

taiki-e avatar Sep 27 '19 16:09 taiki-e

next takes &mut self for ergonomics. If this is Pin<&mut Self>, you need to call .as_mut() every time.

(With the current design, even if Self is !Unpin, you pin it once then you can use it in the same way)

taiki-e avatar Sep 27 '19 16:09 taiki-e

It looks like currently there is no way to make this work for combinators like filter that involve a predicate taking parameters by ref, is there?

montekki avatar Sep 29 '19 18:09 montekki

Yeah, currently, it needs to use future::ready or async function. (Those problems will be fixed if async closure is improved.)

(As it was updated to use async-await in https://github.com/rust-lang-nursery/futures-rs/pull/1843, futures's StreamExt documentation should be somewhat useful. (And I think those implementations will definitely be useful.)

taiki-e avatar Sep 29 '19 20:09 taiki-e

It looks like in any case taking by ref won't work, not only for async closures:

   |
12 | let mut s = s.filter(|i: &usize| async move { i % 2 == 0 });
   |                      -----------            ^^^^^^^^^^^^^^ cannot be stored outside of its closure
   |                      |
   |                      ...because it cannot outlive this closure
13 |
14 | pin_utils::pin_mut!(s);
   | ----------------------- borrowed data cannot be stored into here...

montekki avatar Sep 30 '19 09:09 montekki

Yeah, current async closures are equivalent to closures which return async blocks.

taiki-e avatar Oct 01 '19 09:10 taiki-e

Is this no longer planned, given that async-std reached 1.0?

jhpratt avatar May 09 '20 06:05 jhpratt