futures-rs
futures-rs copied to clipboard
zip_latest Stream adapter
I would like your opinion on adding a zip_latest
adapter to StreamExt
(a.k.a. combineLatest
in some reactive frameworks, but I think that "combine" does not give any indication of how items are combined).
- Implementation and documentation: stephaneyfx/futures-rs@31886835135ac7e065031a79c4c203b00c723f2d
- Unit tests: stephaneyfx/futures-rs@db577e4bcabb1d7f77493850ce9c79b077701d27
This adapter proves useful when zipping two streams that don't necessarily produce items at the same time and re-using the last produced item is desirable. For example, let's assume a system composed of two components A and B changing over time, but not necessarily simultaneously. The states of A and B can be represented by two streams SA and SB whose items are the new states of each component. The usual zip
adapter produces a new item only when both SA and SB do, so it is not suitable to represent the state of the system as a whole. When SA produces a new item, if SB doesn't, the item freshly produced by SA should be paired with the latest item produced by SB (and vice versa). Such a stream can be used to represent the state of the system. With a diagram, this gives (self and other are Stream):
---a------b------c------> self
------0---1---2---------> other
------a0--b1--b2-c2-----> self.zip_latest(other)
Please let me know if this is something worth integrating to futures-util
, in which case I can submit a PR.
Thank you for such a great crate!
I don't have any particular opposition to this, but I haven't seen it commonly requested. The presence of it in other frameworks makes it seem like others find it useful-- what is it typically used for?
- This would require the items to be
Clone
, right? Or could they be provided downstream as borrows, the combinator forever keeping ownership? - I currently solved my need for such an operator with this, based on
async_stream::stream!
andtokio::select!
. It does look kinda verbose, but it gives you a lot of other opportunities, and doesn't impose any limits on theItem
. I'm not sure that I would prefer a separate adapter that gives me a tuple, even when I'm working of signals of primitive types. (In RxJS I have at first had quite the number of problems with one of the inputs not having a value for a long time, blocking the entire stream. I don't thinkstartWith
on inputs or outputs is an improvement over setting default values oryield
ing something in the post-select!
section when certain values are still not ready.
I don't have any particular opposition to this, but I haven't seen it commonly requested. The presence of it in other frameworks makes it seem like others find it useful-- what is it typically used for?
When I use data streams that arrive at different times, it always need this zip_latest to combine those. i think it's commonly requested. (a.k.a. combineLatest in some reactive frameworks)
@cnzx219 have you considered a declarative macro that expands to something like this:
async_stream::stream! {
let mut it_a = <expression_one>;
let mut it_b = <expression_two>;
let mut a = None;
let mut b = None;
loop {
tokio::select! {
Some(next) = it_a.next() => { a = Some(next); }
Some(next) = it_b.next() => { b = Some(next); }
else => break,
}
if let (Some(a), Some(b)) = (a, b) {
yield (a.clone(), b.clone())
}
}
}
@cnzx219 have you considered a declarative macro that expands to something like this:
async_stream::stream! { let mut a = None; let mut b = None; loop { tokio::select! { Some(next_a) = identifier_one.next() => { a = next_a.into(); } Some(next_b) = identifier_two.next() => { b = next_b.into(); } else => break, } if let (Some(a), Some(b)) = (a, b) { yield (a, b) } } }
Thanks for your advise. But the zip_latest
is totally different from that select!
. select!
focuses on the stream data that is ready first. zip_latest
focuses on combine the latest value into a tuple.
@cnzx219 no no, this is what zip_latest!(identifier_one, identifier_two)
could expand to in your project - a piece of code that contains async_stream::stream!
and tokio::select!
.
The produced impl Stream
has the following behavior:
- until there is a value from all inputs, it doesn't produce items
- once there has been a value from all inputs, it produces a full tuple whenever any of the inputs yields an item
- if all of the input streams close, it closes
which is the behavior of combineLatest
The problem is with lifetimes. Stream
, like Iterator
, doesn't allow Item
to borrow from Self
You could emulate JS behavior when a subset of your items isn't Clone
by using a reference counting pointer type:
use std::rc::Rc;
let zipped = zip_latest!(clone_stream, unclone_stream.map(Rc));
@cnzx219 no no, this is what
zip_latest!(identifier_one, identifier_two)
could expand to in your project - a piece of code that containsasync_stream::stream!
andtokio::select!
.The produced
impl Stream
has the following behavior:
- until there is a value from all inputs, it doesn't produce items
- once there has been a value from all inputs, it produces a full tuple whenever any of the inputs yields an item
- if all of the input streams close, it closes
which is the behavior of
combineLatest
The problem is with lifetimes.
Stream
, likeIterator
, doesn't allowItem
to borrow fromSelf
You could emulate JS behavior when a subset of your items isn'tClone
by using a reference counting pointer type:use std::rc::Rc; let zipped = zip_latest!(clone_stream, unclone_stream.map(Rc));
Thanks for your explanation. And I also know that there are multiple ways to implement this behavior. But I prefer the first one, it can work too. As in my first comments, I just supports that it's commonly requested.
Has anyone implemented this yet? I'd also be keen for a similar combinator which works on Vec<impl Stream<Item = T>>
, as for combineLatest
in Rx frameworks)
@extremeandy zip_latest
I'm working on a zip_latest_all
that should cover the Vec<impl Stream>
case.
@stephaneyfx if you are taking on this undertaking (:v), please consider including zip_with
and zip_all_with
, that take a mapping function from &A, &B
(or &(A, B)
) and &[T]
respectively to the returned Item
, with no Clone
bound.
zip_latest_all
, zip_latest_with_all
and zip_latest_with
were added.