futures-rs icon indicating copy to clipboard operation
futures-rs copied to clipboard

zip_latest Stream adapter

Open stephaneyfx opened this issue 6 years ago • 11 comments

I would like your opinion on adding a zip_latest adapter to StreamExt (a.k.a. combineLatest in some reactive frameworks, but I think that "combine" does not give any indication of how items are combined).

  • Implementation and documentation: stephaneyfx/futures-rs@31886835135ac7e065031a79c4c203b00c723f2d
  • Unit tests: stephaneyfx/futures-rs@db577e4bcabb1d7f77493850ce9c79b077701d27

This adapter proves useful when zipping two streams that don't necessarily produce items at the same time and re-using the last produced item is desirable. For example, let's assume a system composed of two components A and B changing over time, but not necessarily simultaneously. The states of A and B can be represented by two streams SA and SB whose items are the new states of each component. The usual zip adapter produces a new item only when both SA and SB do, so it is not suitable to represent the state of the system as a whole. When SA produces a new item, if SB doesn't, the item freshly produced by SA should be paired with the latest item produced by SB (and vice versa). Such a stream can be used to represent the state of the system. With a diagram, this gives (self and other are Stream):

---a------b------c------> self
------0---1---2---------> other
------a0--b1--b2-c2-----> self.zip_latest(other)

Please let me know if this is something worth integrating to futures-util, in which case I can submit a PR. Thank you for such a great crate!

stephaneyfx avatar Mar 21 '18 05:03 stephaneyfx

I don't have any particular opposition to this, but I haven't seen it commonly requested. The presence of it in other frameworks makes it seem like others find it useful-- what is it typically used for?

cramertj avatar Oct 31 '19 22:10 cramertj

  1. This would require the items to be Clone, right? Or could they be provided downstream as borrows, the combinator forever keeping ownership?
  2. I currently solved my need for such an operator with this, based on async_stream::stream! and tokio::select!. It does look kinda verbose, but it gives you a lot of other opportunities, and doesn't impose any limits on the Item. I'm not sure that I would prefer a separate adapter that gives me a tuple, even when I'm working of signals of primitive types. (In RxJS I have at first had quite the number of problems with one of the inputs not having a value for a long time, blocking the entire stream. I don't think startWith on inputs or outputs is an improvement over setting default values or yielding something in the post-select! section when certain values are still not ready.

qm3ster avatar Mar 25 '20 01:03 qm3ster

I don't have any particular opposition to this, but I haven't seen it commonly requested. The presence of it in other frameworks makes it seem like others find it useful-- what is it typically used for?

When I use data streams that arrive at different times, it always need this zip_latest to combine those. i think it's commonly requested. (a.k.a. combineLatest in some reactive frameworks)

CyberTianzun avatar Sep 15 '21 03:09 CyberTianzun

@cnzx219 have you considered a declarative macro that expands to something like this:

async_stream::stream! {
    let mut it_a = <expression_one>;
    let mut it_b = <expression_two>;
    let mut a = None;
    let mut b = None;
    loop {
        tokio::select! {
            Some(next) = it_a.next() => { a = Some(next); }
            Some(next) = it_b.next() => { b = Some(next); }
            else => break,
        }
        if let (Some(a), Some(b)) = (a, b) {
            yield (a.clone(), b.clone())
        }
    }
}

qm3ster avatar Sep 15 '21 18:09 qm3ster

@cnzx219 have you considered a declarative macro that expands to something like this:

async_stream::stream! {
    let mut a = None;
    let mut b = None;
    loop {
        tokio::select! {
            Some(next_a) = identifier_one.next() => { a = next_a.into(); }
            Some(next_b) = identifier_two.next() => { b = next_b.into(); }
            else => break,
        }
        if let (Some(a), Some(b)) = (a, b) {
            yield (a, b)
        }
    }
}

Thanks for your advise. But the zip_latest is totally different from that select!. select! focuses on the stream data that is ready first. zip_latest focuses on combine the latest value into a tuple.

CyberTianzun avatar Sep 22 '21 08:09 CyberTianzun

@cnzx219 no no, this is what zip_latest!(identifier_one, identifier_two) could expand to in your project - a piece of code that contains async_stream::stream! and tokio::select!.

The produced impl Stream has the following behavior:

  1. until there is a value from all inputs, it doesn't produce items
  2. once there has been a value from all inputs, it produces a full tuple whenever any of the inputs yields an item
  3. if all of the input streams close, it closes

which is the behavior of combineLatest

The problem is with lifetimes. Stream, like Iterator, doesn't allow Item to borrow from Self You could emulate JS behavior when a subset of your items isn't Clone by using a reference counting pointer type:

use std::rc::Rc;
let zipped = zip_latest!(clone_stream, unclone_stream.map(Rc));

qm3ster avatar Sep 22 '21 09:09 qm3ster

@cnzx219 no no, this is what zip_latest!(identifier_one, identifier_two) could expand to in your project - a piece of code that contains async_stream::stream! and tokio::select!.

The produced impl Stream has the following behavior:

  1. until there is a value from all inputs, it doesn't produce items
  2. once there has been a value from all inputs, it produces a full tuple whenever any of the inputs yields an item
  3. if all of the input streams close, it closes

which is the behavior of combineLatest

The problem is with lifetimes. Stream, like Iterator, doesn't allow Item to borrow from Self You could emulate JS behavior when a subset of your items isn't Clone by using a reference counting pointer type:

use std::rc::Rc;
let zipped = zip_latest!(clone_stream, unclone_stream.map(Rc));

Thanks for your explanation. And I also know that there are multiple ways to implement this behavior. But I prefer the first one, it can work too. As in my first comments, I just supports that it's commonly requested.

CyberTianzun avatar Sep 22 '21 16:09 CyberTianzun

Has anyone implemented this yet? I'd also be keen for a similar combinator which works on Vec<impl Stream<Item = T>>, as for combineLatest in Rx frameworks)

extremeandy avatar Oct 08 '22 02:10 extremeandy

@extremeandy zip_latest I'm working on a zip_latest_all that should cover the Vec<impl Stream> case.

stephaneyfx avatar Oct 08 '22 14:10 stephaneyfx

@stephaneyfx if you are taking on this undertaking (:v), please consider including zip_with and zip_all_with, that take a mapping function from &A, &B(or &(A, B)) and &[T] respectively to the returned Item, with no Clone bound.

qm3ster avatar Oct 12 '22 00:10 qm3ster