async-stream icon indicating copy to clipboard operation
async-stream copied to clipboard

`try_stream!` / `stream!` with error close stream

Open BratSinot opened this issue 3 years ago • 4 comments

Greetings!

Why Err close stream? I can't use .filter(..) in that case.

use async_stream::{stream, try_stream};
use futures_util::{pin_mut, Stream, StreamExt};
use std::future::ready;

fn error_or_ok(rcx: usize) -> Result<usize, String> {
    if rcx % 2 == 0 {
        Ok(rcx)
    } else {
        Err(format!("Error{rcx}"))
    }
}

fn return_stream() -> impl Stream<Item = Result<usize, String>> {
    //stream! {
    try_stream! {
        let mut rcx = 0usize;
        loop {
            //yield Ok(error_or_ok(rcx)?); // with stream! closes
            //yield {move || { Ok(error_or_ok(rcx)?) }}(); // with stream! ok
            yield error_or_ok(rcx)?; // with try_stream! closes
            rcx += 1;
        }
    }
    .filter(|data| {
        let ret = if let Err(error) = data {
            eprintln!("Got error: `{error}`.");
            false
        } else {
            true
        };

        ready(ret)
    })
}

#[tokio::main]
async fn main() {
    let stream = return_stream();
    pin_mut!(stream);

    println!("Some: `{:?}`", stream.next().await);
    println!("None: `{:?}`", stream.next().await);
    println!("Some: `{:?}`", stream.next().await);
    println!("None: `{:?}`", stream.next().await);
}

BratSinot avatar Oct 24 '22 10:10 BratSinot