futures-rs icon indicating copy to clipboard operation
futures-rs copied to clipboard

Add `StreamExt::wait_until` combinator

Open jerry73204 opened this issue 3 years ago • 0 comments

The stream.wait_until(fut) combinator lets the stream to wait until the future resovles. The stream start taking elements if the future resolves to true, otherwise the stream fuses if it resolve to false.

It is useful for initialization purpose. For example, the stream has to wait for user configuration to be ready to start producing values, and fuse the stream if the configuration is not correct. The code below illustrates the idea.

// Suppose a oneshot channel to receive the acknowledgement from user.
let (tx, rx) = oneshot::channel::<bool>();

// The sender is passed to user.
give_to_user(tx);

stream::iter(0..100)
    .wait_for(async move {
        if let Ok(yes) = rx.await {
            // forward to user opinion
            yes
        } else {
            // the sender is dropped so fuse the stream
            false
        }
    })
    .for_each(|val| async move { /* do something */ })
    .await;

A practical example is in my par-stream crate. To broadcast the messages from a stream, it creates a builder to user. The user registers receivers using the builder. The receivers should wait until the builder is finished or dropped. It ensures that every receiver can read the first value from stream.

jerry73204 avatar Dec 31 '21 10:12 jerry73204