syncbox
syncbox copied to clipboard
Flesh out the Stream API
This issue is to track the overall API of Stream at a highish level and is a
work in progress. I'll update the original post as the discussion evolves.
impl Stream<T, E> {
// Counts the number of elements in the Stream
fn count(self) -> Future<usize, E>;
// Chain this stream with another (could also be named concat)
fn chain(self, next: Stream<T, E>) -> Stream<T, E>;
// Creates a stream that iterates over both this and the specified streams
// simultaneously, yielding the two elements as pairs.
fn zip(self, other: Stream<U, E>) -> Stream<(T, U), E>;
// Create a new stream that represents the application of the specified
// function to each value of the original stream.
fn map<U, F: Fn(T) -> U>(self, f: F) -> Stream<U, E>;
// Creates a new stream that contains the values of the original stream
// that match the specified predicate
fn filter<P: F: Fn(T) -> bool>(self, f: P) -> Stream<T, E>;
// Creates a new stream that both filters and maps elements
fn filter_map<U, F: Fn(T) -> Option<U>>(self, f: F) -> Stream<U, E>;
// Creates a stream that yields a pair of the original value and the
// current iteration index
fn enumerate(self) -> Stream<(u64, T), E>;
// Limit the number of values to up to n
fn take(self, n: u64) -> Stream<T, E>;
// Take values as long as the supplied predicate returns true
fn take_while<F: Fn(T) -> bool>(self, f: F) -> Stream<T, E>;
// Take values from the original stream until the condition (supplied as an async value)
// completes.
fn take_until<A: Async>(self, a: A) -> Stream<T, E>;
// Skip the first n values of the stream
fn skip(self, n: u64) -> Stream<T, E>;
// Skip values as long as the supplied predicate returns true
fn skip_while<f: Fn(T) -> bool>(self, f: F) -> Stream<T, E>;
// Skip values until the condition (supplied as an async value) completes.
fn skip_until<A: Async>(self, a: A) -> Stream<T, E>;
// Computes up to N values in parallel ahead of the consumer requesting
// them, storing the computed values in the `Stream`.
fn buffer(self, n: usize) -> Stream<T, E>;
// Most of the functions from IteratorExt should be here implemented for Stream
}
Still needed
Waiting for a chunk of values
A function batch that blocks until the N first values of the stream are
completed, returning them as a Vec. What should happen if the stream doesn't
have n values left?
fn batch(self, n: usize) -> Future<(Vec<T>, Stream<T, E>), E>
Parallelizing computation
A function that allows processing the values of the stream "asynchronously"
returning a new Stream that represents the results as the computations
complete.
For example a stream of URLs.
url_stream.map(|url| http::get(url)) // => Stream<Future<Response>>
Now the goal would be to map that into Stream<Response> such that at most N
http requests are in flight and the resulting stream contains the responses as
the complete, regardless of the original order of the URLs.
@carllerche I think we could gain a higher efficiency if we took the Iterator approach here, and wrote a Stream trait which we then implement for Stream (would have to get renamed) and any adaptors (take, map, etc.).
@reem This won't really work due to the lack of HKT. Having a concrete Stream type is the best that can be done for now. Specifically because Stream<T> impls Async<Option<(T, Stream<T>)>> Once there is HKT (and the ability to return a generic value, see the problem of writing a function that returns an instance of Iterator), I will most likely change Stream to be a trait.
We should also add an adaptor for turning a Stream into a blocking Iterator, something like:
impl Stream<T, E> {
fn into_iter(self) -> IntoIter<T, E>;
}
impl<T, E> Iterator for IntoIter<T, E> {
type Item = Result<T, E>; // sucks that Iterators don't support this pattern
fn next(&mut self) -> Result<T, E> {
// Await on the stream and get one value out.
}
}
Waiting for n values is also easily doable with the iterator adaptor:
let mut iter = stream.into_iter();
let mut chunk = iter.by_ref().take(n).collect::<Vec<_>>();
let stream = iter.into_stream(); // Inherent method on stream::IntoIter
@reem Yeah, Stream right now has an iter() fn, but it should be updated to the new style.
Also, when I mean by "wait" on N vals, I mean an "async wait". So, Stream::wait_for(n) -> Future<(n-elems, rest)> kind of thing. So it would have to be a fn on Stream vs. going with the iterator.
I wonder if that actually does have to be built in. I think you could translate a Vec<Future<T, E> to a Stream<T, E> by repeatedly selecting on all the futures in the Vec.
// Creates a stream that iterates over both this and the specified streams
// simultaneously, yielding the two elements as pairs.
fn zip(self, other: Stream<U, E>) -> Stream<(T, U), E>;
When does it end?
Regarding batch(n) and not having enough elements, I think it would be reasonable to just return what it is able to. The client should not necessarily expect n items if the Stream ends. batch() would solve https://github.com/carllerche/syncbox/issues/11 though.
We also noted that take() would consume the entire stream, unlike an Iterator. It should be part of the docs. It was noted that if we did something like a partitioning we could preserve the remaining stream.
<~carllerche> maybe what you want is a partition
<~carllerche> maybe Stream<T>::collect() -> Future<Vec<T>>
<hoverbear> Something like `chunks()` in Iterator?
<~carllerche> then you can do Stream<T>::partition_at(n) -> (Stream<T>, Stream<T>)
Reopening because this issue wasn't really supposed to be closed yet.
Regarding the parallelizing computation function, I'm thinking something like:
fn process<U: Async, F: Fn(T) -> U>(in_flight: usize, f: F) -> Stream<U::Value>;
Returns a new stream representing the processed values of the original stream in completion order. A value is processed by calling the supplied function and waiting for the completion of the returned async value. There can be at most in_flight values from the original stream being processed at a given time.
So basically, process at most in_flight values from the original stream and provide the result as they become realized.
This would be different than .take() because .take() requires the values to be fully processed?
An interesting example use case for the parallel processing API is performing HTTP request pipelining.
Should be closed now that streams are in eventual?