syncbox icon indicating copy to clipboard operation
syncbox copied to clipboard

Flesh out the Stream API

Open carllerche opened this issue 10 years ago • 12 comments
trafficstars

This issue is to track the overall API of Stream at a highish level and is a work in progress. I'll update the original post as the discussion evolves.

impl Stream<T, E> {

    // Counts the number of elements in the Stream
    fn count(self) -> Future<usize, E>;

    // Chain this stream with another (could also be named concat)
    fn chain(self, next: Stream<T, E>) -> Stream<T, E>;

    // Creates a stream that iterates over both this and the specified streams
    // simultaneously, yielding the two elements as pairs.
    fn zip(self, other: Stream<U, E>) -> Stream<(T, U), E>;

    // Create a new stream that represents the application of the specified
    // function to each value of the original stream.
    fn map<U, F: Fn(T) -> U>(self, f: F) -> Stream<U, E>;

    // Creates a new stream that contains the values of the original stream
    // that match the specified predicate
    fn filter<P: F: Fn(T) -> bool>(self, f: P) -> Stream<T, E>;

    // Creates a new stream that both filters and maps elements
    fn filter_map<U, F: Fn(T) -> Option<U>>(self, f: F) -> Stream<U, E>;

    // Creates a stream that yields a pair of the original value and the
    // current iteration index
    fn enumerate(self) -> Stream<(u64, T), E>;

    // Limit the number of values to up to n
    fn take(self, n: u64) -> Stream<T, E>;

    // Take values as long as the supplied predicate returns true
    fn take_while<F: Fn(T) -> bool>(self, f: F) -> Stream<T, E>;

    // Take values from the original stream until the condition (supplied as an async value)
    // completes.
    fn take_until<A: Async>(self, a: A) -> Stream<T, E>;

    // Skip the first n values of the stream
    fn skip(self, n: u64) -> Stream<T, E>;

    // Skip values as long as the supplied predicate returns true
    fn skip_while<f: Fn(T) -> bool>(self, f: F) -> Stream<T, E>;

    // Skip values until the condition (supplied as an async value) completes.
    fn skip_until<A: Async>(self, a: A) -> Stream<T, E>;

    // Computes up to N values in parallel ahead of the consumer requesting
    // them, storing the computed values in the `Stream`.
    fn buffer(self, n: usize) -> Stream<T, E>;

    // Most of the functions from IteratorExt should be here implemented for Stream
}

Still needed

Waiting for a chunk of values

A function batch that blocks until the N first values of the stream are completed, returning them as a Vec. What should happen if the stream doesn't have n values left?

fn batch(self, n: usize) -> Future<(Vec<T>, Stream<T, E>), E>

Parallelizing computation

A function that allows processing the values of the stream "asynchronously" returning a new Stream that represents the results as the computations complete.

For example a stream of URLs.

url_stream.map(|url| http::get(url)) // => Stream<Future<Response>>

Now the goal would be to map that into Stream<Response> such that at most N http requests are in flight and the resulting stream contains the responses as the complete, regardless of the original order of the URLs.

carllerche avatar Mar 12 '15 22:03 carllerche

@carllerche I think we could gain a higher efficiency if we took the Iterator approach here, and wrote a Stream trait which we then implement for Stream (would have to get renamed) and any adaptors (take, map, etc.).

reem avatar Mar 12 '15 22:03 reem

@reem This won't really work due to the lack of HKT. Having a concrete Stream type is the best that can be done for now. Specifically because Stream<T> impls Async<Option<(T, Stream<T>)>> Once there is HKT (and the ability to return a generic value, see the problem of writing a function that returns an instance of Iterator), I will most likely change Stream to be a trait.

carllerche avatar Mar 12 '15 22:03 carllerche

We should also add an adaptor for turning a Stream into a blocking Iterator, something like:

impl Stream<T, E> {
     fn into_iter(self) -> IntoIter<T, E>;
}

impl<T, E> Iterator for IntoIter<T, E> {
     type Item = Result<T, E>; // sucks that Iterators don't support this pattern

     fn next(&mut self) -> Result<T, E> {
          // Await on the stream and get one value out.
     }
}

reem avatar Mar 12 '15 23:03 reem

Waiting for n values is also easily doable with the iterator adaptor:

let mut iter = stream.into_iter();
let mut chunk = iter.by_ref().take(n).collect::<Vec<_>>();
let stream = iter.into_stream(); // Inherent method on stream::IntoIter

reem avatar Mar 12 '15 23:03 reem

@reem Yeah, Stream right now has an iter() fn, but it should be updated to the new style.

Also, when I mean by "wait" on N vals, I mean an "async wait". So, Stream::wait_for(n) -> Future<(n-elems, rest)> kind of thing. So it would have to be a fn on Stream vs. going with the iterator.

carllerche avatar Mar 13 '15 00:03 carllerche

I wonder if that actually does have to be built in. I think you could translate a Vec<Future<T, E> to a Stream<T, E> by repeatedly selecting on all the futures in the Vec.

reem avatar Mar 13 '15 00:03 reem

    // Creates a stream that iterates over both this and the specified streams
    // simultaneously, yielding the two elements as pairs.
    fn zip(self, other: Stream<U, E>) -> Stream<(T, U), E>;

When does it end?

Regarding batch(n) and not having enough elements, I think it would be reasonable to just return what it is able to. The client should not necessarily expect n items if the Stream ends. batch() would solve https://github.com/carllerche/syncbox/issues/11 though.

We also noted that take() would consume the entire stream, unlike an Iterator. It should be part of the docs. It was noted that if we did something like a partitioning we could preserve the remaining stream.

<~carllerche> maybe what you want is a partition
<~carllerche> maybe Stream<T>::collect() -> Future<Vec<T>>
<hoverbear> Something like `chunks()` in Iterator?
<~carllerche> then you can do Stream<T>::partition_at(n) -> (Stream<T>, Stream<T>)

Hoverbear avatar Mar 13 '15 03:03 Hoverbear

Reopening because this issue wasn't really supposed to be closed yet.

carllerche avatar Mar 17 '15 06:03 carllerche

Regarding the parallelizing computation function, I'm thinking something like:

fn process<U: Async, F: Fn(T) -> U>(in_flight: usize, f: F) -> Stream<U::Value>;

Returns a new stream representing the processed values of the original stream in completion order. A value is processed by calling the supplied function and waiting for the completion of the returned async value. There can be at most in_flight values from the original stream being processed at a given time.

So basically, process at most in_flight values from the original stream and provide the result as they become realized.

carllerche avatar Mar 17 '15 07:03 carllerche

This would be different than .take() because .take() requires the values to be fully processed?

Hoverbear avatar Mar 22 '15 18:03 Hoverbear

An interesting example use case for the parallel processing API is performing HTTP request pipelining.

reem avatar Mar 25 '15 10:03 reem

Should be closed now that streams are in eventual?

kamalmarhubi avatar Oct 28 '15 18:10 kamalmarhubi