async-std
async-std copied to clipboard
async combinators for Stream
Our stream combinators operate on asynchronous data, but are themselves synchronous.
This means it's not possible to for example filter or map a stream based on other async operations. Which seems limiting, as it's possible to call sync functions from async functions but not the other way around.
I'd like to propose we update our Stream combinators that take closures to be async functions.
cc/ @montekki ref #129
How would this be represented in the type system currently? Can you write an example for the map combinator to illustrate? I believe this is blocked on traits AsyncFn, AsyncFnOnce, and AsyncFnMut.
Still, I believe combinators with async closures should be representable with the stream! syntax. For example, the equivalent of s.map(async |x| x.await) could be written as:
let s = stream! {
for x in s {
yield x.await;
}
};
So we currently don't have map, but futures::Stream::map is defined as:
// Definition
fn map<T, F>(self, f: F) -> Map<Self, F> where
F: FnMut(Self::Item) -> T;
// Usage
stream.map(|i| i * 2);
However this is problematic when say we want to call an async fn from within the map. For example doing a database lookup for each key in a stream. For that the map combinator would need to be defined as:
// Definition
fn map<T, F, R>(self, f: F) -> Map<Self, F> where
R: Future<Output = T>,
F: FnMut(Self::Item) -> R;
// Usage
stream.map(async |i| db.lookup(i).await);
You're right in pointing out we don't have async closures yet. But we can somewhat pretend we do as this is valid on Rust 1.39:
stream.map(|i| async { db.lookup(i).await });
However ideally we could have an actual AsyncFn trait as part of stdlib, and an automatic mapping going from Fn to AsyncFn so it's treated as a strict superset of the regular closures. But that's a bit further out in the future, and I think we'll be able to incorporate that in a backwards-compatible way. Here's a playground example of a similar trait definition.
So we currently don't have
map, butfutures::Stream::mapis defined as:
Many combinators of futures_util::stream other than Map use FnMut(T) -> impl Future closures.
https://docs.rs/futures-preview/0.3.0-alpha.19/futures/stream/trait.StreamExt.html#method.filter_map
@taiki-e Why do filter and filter_map use async closures but map doesn't? Was that intentional or is that just an oversight?
@stjepang Maybe it ’s an oversight. I will file an issue.
UPDATE: filed https://github.com/rust-lang-nursery/futures-rs/issues/1889.
@yoshuawuyts assign this to me please
@taiki-e is this by design that next() is not usable with those combinators? So this will not work:
let s = stream::iter(1u8..10);
let mut s = s.filter_map(|a| async move { Some(a) });
while let Some(n) = s.next().await {
println!("next {}", n);
}
since next() is bound by Unpin.
@montekki
You need to use pin_utils::pin_mut (or Box::pin)
(Pin<impl Deref> always implements Unpin.)
let s = stream::iter(1u8..10);
let s = s.filter_map(|a| async move { Some(a) });
pin_utils::pin_mut!(s);
while let Some(n) = s.next().await {
println!("next {}", n);
}
next takes &mut self for ergonomics. If this is Pin<&mut Self>, you need to call .as_mut() every time.
(With the current design, even if Self is !Unpin, you pin it once then you can use it in the same way)
It looks like currently there is no way to make this work for combinators like filter that involve a predicate taking parameters by ref, is there?
Yeah, currently, it needs to use future::ready or async function. (Those problems will be fixed if async closure is improved.)
(As it was updated to use async-await in https://github.com/rust-lang-nursery/futures-rs/pull/1843, futures's StreamExt documentation should be somewhat useful. (And I think those implementations will definitely be useful.)
It looks like in any case taking by ref won't work, not only for async closures:
|
12 | let mut s = s.filter(|i: &usize| async move { i % 2 == 0 });
| ----------- ^^^^^^^^^^^^^^ cannot be stored outside of its closure
| |
| ...because it cannot outlive this closure
13 |
14 | pin_utils::pin_mut!(s);
| ----------------------- borrowed data cannot be stored into here...
Yeah, current async closures are equivalent to closures which return async blocks.
Is this no longer planned, given that async-std reached 1.0?