async-stream icon indicating copy to clipboard operation
async-stream copied to clipboard

Add `stream` and `try_stream` functions

Open SabrinaJewson opened this issue 3 years ago • 16 comments
trafficstars

Example usage:

let s = stream(|stream| async move {
    for i in 0..3 {
        stream.yield_item(i).await;
    }
});

Advantages of using the new API:

  1. Type annotations with stream::<u32, _, _> or try_stream::<u32, io::Error, _, _> or |stream: async_stream::Stream<u32>|
  2. Better IDE support
  3. Works better with rustfmt
  4. Reduces dependencies by avoiding depending on syn
  5. Better documentation

Disadvatanges of using the new API:

  1. It’s more verbose (stream.yield_item(i).await versus yield i;)
  2. It’s easy to misuse if the stream variable is moved outside the stream, used inside join! or select! or FuturesUnordered etc

Open questions:

  • What should the parameter be named? Right now I’m calling it stream and Stream which I was inspired by thread::scope for, but there may be a better name.

closes #71, closes #56, closes #63, closes #64 (latter two are via #56) Might close, if new API is considered sufficient: #33, #68

SabrinaJewson avatar May 25 '22 20:05 SabrinaJewson

Disadvatanges of using the new API:

I think the lack of the for await syntax would also be included in this list.

taiki-e avatar Jul 07 '22 16:07 taiki-e

cc @carllerche @Darksonn @Kestrer: any thoughts on this?

taiki-e avatar Jul 07 '22 16:07 taiki-e

It’s easy to misuse if the stream variable is moved outside the stream, used inside join! or select! or FuturesUnordered etc

I would want a better understanding of how it fails if used in these.

Darksonn avatar Jul 09 '22 11:07 Darksonn

The following code is incorrect:

join!(
    async { stream.yield_item(1).await; },
    async { stream.yield_item(2).await; },
);

because it will result in two items being yielded within the same poll, which results in a panic.

For the select!-like case, code is incorrect:

try_join!(
    async { stream.yield_item(1).await; },
    async { something_that_fails()?; Ok(()) },
)?;

Because an item will be yielded but then the try_join! will exit before the Pending gets propagated, so the outer task will exit and that will panic.

Of course, making these operations panic is a choice. Alternatives to the first case include:

  • Silently dropping the extra values
  • Storing a Vec of all the values each poll
  • Having yield_item take &mut self

For the second case, we could also just obediently yield the item.

SabrinaJewson avatar Jul 09 '22 11:07 SabrinaJewson

Having this fail seems pretty confusing. I would certainly prefer a version where this works. Perhaps yield_item could do a yield_now if it is unable to yield it immediately?

join!(
    async { stream.yield_item(1).await; },
    async { stream.yield_item(2).await; },
);

Darksonn avatar Jul 09 '22 11:07 Darksonn

Oh wait, I was mistaken — yield_item already does take &mut self, meaning that the join! example woudn’t even compile. In that case there’s much less of a problem, the only issue can arise when cloning the Stream variable:

let stream_2 = stream.clone();
join!(
    async { stream.yield_item(1).await; },
    async { stream_2.yield_item(2).await; },
);

and this code would panic. So, we could remove the Clone implementation — I kept it in there just because I could, I don’t really know how useful it’d be.

Another option is, as you said, to have yield_item take &self and yield if it can’t give the item immediately. It boils down to whether we want to support code like that (yielding in multiple join branches). With the current macro it’s not possible, so maybe it’s fine to keep it that way.

SabrinaJewson avatar Jul 09 '22 12:07 SabrinaJewson

This is so much more user friendly. I would love to see this get merged! I don't know what the next steps should be but maybe you could resolve the conflicts please?

mohe2015 avatar Sep 11 '22 20:09 mohe2015

Would love to see this added.

ghost avatar Oct 14 '22 03:10 ghost

I don't know if this would be possible but it seems like it's not Send:

error: future cannot be sent between threads safely
   --> backend-rust/src/bin/server/s_setup.rs:180:24
    |
180 |     Ok(StreamBody::new(stream).into_response())
    |                        ^^^^^^ future created by async block is not `Send`
    |
    = help: within `impl FusedStream + futures_util::Stream<Item = Result<axum::body::Bytes, anyhow::Error>>`, the trait `std::marker::Send` is not implemented for `NonNull<()>`
note: future is not `Send` as this value is used across an await
   --> /home/moritz/.cargo/git/checkouts/async-stream.rs-d36ba58420dc21e4/d5e01e5/async-stream/src/functions.rs:313:28
    |
300 |         let mut option_stream = CURRENT_CONTAINING_STREAM.with(Cell::get);
    |             ----------------- has type `std::option::Option<async_stream::functions::ContainingStream>` which is not `Send`
...
313 |         PendOnce::default().await;
    |                            ^^^^^^ await occurs here, with `mut option_stream` maybe used later
314 |     }
    |     - `mut option_stream` is later dropped here
note: required by a bound in `StreamBody::<S>::new`
   --> /home/moritz/.cargo/registry/src/github.com-1ecc6299db9ec823/axum-0.6.0/src/body/stream_body.rs:78:24
    |
78  |         S: TryStream + Send + 'static,
    |                        ^^^^ required by this bound in `StreamBody::<S>::new`


mohe2015 avatar Nov 27 '22 19:11 mohe2015

You’re right — I have now pushed a fix and tests for that.

SabrinaJewson avatar Nov 27 '22 20:11 SabrinaJewson

Is there interest at all by the maintainers to integrate this? If so what would the steps be? Maybe check/reduce breakage of existing code and document an upgrade path? (Also merge conflicts again)

I looked over the code and it didn't look too bad. There are a few things I'm not familiar with though like manual future implementations.

mohe2015 avatar Dec 27 '22 18:12 mohe2015

Maybe the macro feature should be enabled by default to aid in compatiblity?

mohe2015 avatar Dec 27 '22 18:12 mohe2015

Another upside of this approach:

Unlike yield, allows defining of a custom Ext trait. E.g. yield Message::new("content").boxed() can be stream.message("content").

jakajancar avatar Jan 07 '23 06:01 jakajancar

I suggest a shorter name stream.yield_(expr), since we already know the expression is an item of the stream.

Nugine avatar Feb 02 '23 10:02 Nugine

Note that this following code is incorrect:

Case I

tokio::task(async move {
    stream.yield_item(1).await;
});

Case II

let _ = || async move {
    stream.yield_item(1).await;
};

Case III

let mut _stream = None;
let s = stream(|stream| {
   _stream = Some(stream);
    async {}
});

And much more...

Solution

Instead of move ing s into async block, Just return it.

stream|mut s| async {
    s.yield_(42).await;
    return (s, "42");
});

I create an alternative to this crate called async-gen, which supports these feature.

nurmohammed840 avatar Jun 21 '23 08:06 nurmohammed840