futures-rs
futures-rs copied to clipboard
Replace `Extend` in `Stream::collect` with custom `FromStream` (a la `FromIterator`)
I'm puzzled on this one. I could understand Rust not being able to infer the type with how generic Stream::collect is but Rust can somehow infer it from the declaration if the type is not Copy.
use futures::executor::block_on;
use futures::future::ready;
use futures::stream::once;
use futures::Stream;
use futures::StreamExt;
fn defaults_stream<T: Default>() -> impl Stream<Item = T> {
once(ready(T::default()))
}
fn defaults_iter<T: Default>() -> impl Iterator<Item = T> {
vec![T::default()].into_iter()
}
fn main() {
// iter / String : ok
let _: Vec<String> = defaults_iter().collect();
// iter / u32 : ok
let _: Vec<u32> = defaults_iter().collect();
// stream / String : ok
let _: Vec<String> = block_on(defaults_stream().collect());
// stream / u32 : err (?)
let _: Vec<u32> = block_on(defaults_stream().collect());
// stream / (u32, String) : ok (?)
let _: Vec<(u32, String)> = block_on(defaults_stream().collect());
// stream / (u32, u32) : err (?)
let _: Vec<(u32, u32)> = block_on(defaults_stream().collect());
}
Any idea what's going on here?
I should note that I've tried and reproduced this in 1.36.0 and nightly 2019-08-28.
Update: After talking this through with a colleague we figured out that the issue here is that .collect() in Stream depends on Extend which has two possible implementations for Copy types on Vec.
Possible solution is to make a FromStream trait like Iterator has FromIterator.
I think that this is a high priority issue, primarily because collect does not take the size-hint into account, meaning that it allocates for each item in the stream, which is very inefficient.
Ideally FromStream would look like this:
trait FromStream<A>: Sized {
async fn from_stream<S: Stream<Item = A>>(stream: S) -> Self;
}
But of course, we don't have async fn in traits, so this wouldn't work. Instead, we have a couple of different options.
- Boxed Future
trait FromStream<A>: Sized {
fn from_stream<'a, S: Stream<Item = A>>(stream: S) -> BoxFuture<'a, Self>;
}
This is what async-std does, and is problematic for obvious reasons.
- Poll based
trait FromStream<A>: Sized {
type Buf;
fn initialize(size_hint: (usize, Option<usize>)) -> Self::Buf;
fn poll_from_stream<S: Stream<Output = A>>(
buf: &mut Self::Buf,
stream: Pin<&mut S>,
cx: &mut Context<'_>,
) -> Poll<Self>;
}
The problem with this approach is that it is very limited. Control flow is hard because you can't store the stream anywhere. For example, when implementing it for Result, I had hack around this by creating once streams to push to the underlying buffer, or and empty streams to construct the final collection.
- Extend based
This is what tokio-stream does (as private API):
pub trait FromStream<A>: Sized {
type Buf;
fn initialize(size_hint: (usize, Option<usize>)) -> Self::Buf;
fn extend(buf: &mut Self::Buf, val: A) -> ControlFlow;
fn finish(buf: Self::Buf) -> Self;
}
This is very similar to the current Default + Extend approach. Again, it is limited, but would probably work for most implementations. It doesn't completely suffer from the same problem as the poll-based version because extend and finish are separate methods that wrapper implementations can use.
All of these APIs feel subpar in comparison to async fn. The closest that we can get to what is desired is with a futures-based API:
- Future Based
trait FromStream<A>: Sized {
type Output: Future<Output = Self>;
fn from_stream<S: Stream<Item = A>>(stream: S) -> Self::Output;
}
The problem with this approach is that Output does not have access to the generic S, which would require GATs:
impl FromStream<Foo> for Bar {
type Output<A> = BarFromStreamFut<A>;
fn from_stream<S: Stream<Item = A>>(stream: S) -> Self::Output<S>;
}
Instead, the future would have to box the stream, which suffers from the same issues as #1.
- Future Based + Generic over stream
A solution to the above problem is to make FromStream generic over the actual stream, rather than the item:
trait FromStream<S: Stream> {
type Output: Future<Output = Self>;
fn from_stream(stream: S) -> Self::Output;
}
However, generics with different associated types are still considered conflicting:
// ERROR: conflicting implementations
impl<S: Stream<Item = char>> FromStream<S> for String { ... }
impl<S: Stream<Item = String>> FromStream<S> for String { ... }
This means that FromStream would have to be generic over both the stream, and the item:
- Future Based + Generic over stream and item
trait FromStream<A, S>
where
S: Stream<Output = A>,
{
type Output: Future<Output = Self>;
fn from_stream(stream: S) -> Self::Output;
}
This solution works, and is forward compatible with type_alias_impl_trait, which will make implementation easy in the future. The main problem is the same one that the original FromIterator<A, S> suffered from:
If they are on the trait then it is extremely annoying to use them as generic parameters to a function, e.g. with the iterator param on the trait itself, if one was to pass an Extendable to a function that filled it either from a Range or a Map, one needs to write something like:
fn foo<E: Extendable<int, Range<int>> + Extendable<int, Map<&'self int, int, VecIterator<int>>> (e: &mut E, ...) { ... }since using a generic, i.e. foo<E: Extendable<int, I>, I: Iterator
> means that foo takes 2 type parameters, and the caller has to specify them (which doesn't work anyway, as they'll mismatch with the iterators used in foo itself).
I still think that this approach has merit, and is least-breaking to change in the future when we get GATs or async fn in traits. Those however are quite far from getting stabilized, so it makes sense to add this definition to futures now. Users will not really interact directly with FromStream - it's main purpose is for use with StreamExt::collect, so slightly more complex bounds are probably fine.
It's also worth noting that Extend::extend_reserve is a nightly feature that could be used to solve the issue of reserving space based on the size-hint, but this does not allow for things like collecting into a result.
I'm working on a PR for the 6th method I mentioned, and it should be opened soon. Any thoughts?
cc @Nemo157 @taiki-e
Hm.. so the problem with adding a second generic parameter means that you can't implement it for Result<V, E> because you have to introduce a second stream parameter for V: FromStream<A, VS>, and VS is not constrained because there could potentially be multipe impls...
@taiki-e which API do you prefer?
Hmm... My thought when I looked into this before was "there is no best way at this time". https://github.com/rust-lang/wg-async-foundations/pull/15#discussion_r453283496
Regarding option 4 outlined above, GATs have been stabilized as of Rust 1.65. Not sure if there is any appetite in moving forward with this approach in a next major/minor release of the futures crate, or if alternative approaches have been discussed in the meantime, but I thought this would be worth bubbling to the top.
async fn in traits is now stable as well, but this feature arrived much more recently in Rust 1.75 and has some notable caveats. It may not be the best candidate to use for the time being until those are ironed out.