async-stream
async-stream copied to clipboard
Add `stream` and `try_stream` functions
Example usage:
let s = stream(|stream| async move {
for i in 0..3 {
stream.yield_item(i).await;
}
});
Advantages of using the new API:
- Type annotations with
stream::<u32, _, _>ortry_stream::<u32, io::Error, _, _>or|stream: async_stream::Stream<u32>| - Better IDE support
- Works better with
rustfmt - Reduces dependencies by avoiding depending on
syn - Better documentation
Disadvatanges of using the new API:
- It’s more verbose (
stream.yield_item(i).awaitversusyield i;) - It’s easy to misuse if the
streamvariable is moved outside the stream, used insidejoin!orselect!orFuturesUnorderedetc
Open questions:
- What should the parameter be named? Right now I’m calling it
streamandStreamwhich I was inspired bythread::scopefor, but there may be a better name.
closes #71, closes #56, closes #63, closes #64 (latter two are via #56) Might close, if new API is considered sufficient: #33, #68
Disadvatanges of using the new API:
I think the lack of the for await syntax would also be included in this list.
cc @carllerche @Darksonn @Kestrer: any thoughts on this?
It’s easy to misuse if the stream variable is moved outside the stream, used inside
join!orselect!orFuturesUnorderedetc
I would want a better understanding of how it fails if used in these.
The following code is incorrect:
join!(
async { stream.yield_item(1).await; },
async { stream.yield_item(2).await; },
);
because it will result in two items being yielded within the same poll, which results in a panic.
For the select!-like case, code is incorrect:
try_join!(
async { stream.yield_item(1).await; },
async { something_that_fails()?; Ok(()) },
)?;
Because an item will be yielded but then the try_join! will exit before the Pending gets propagated, so the outer task will exit and that will panic.
Of course, making these operations panic is a choice. Alternatives to the first case include:
- Silently dropping the extra values
- Storing a
Vecof all the values each poll - Having
yield_itemtake&mut self
For the second case, we could also just obediently yield the item.
Having this fail seems pretty confusing. I would certainly prefer a version where this works. Perhaps yield_item could do a yield_now if it is unable to yield it immediately?
join!(
async { stream.yield_item(1).await; },
async { stream.yield_item(2).await; },
);
Oh wait, I was mistaken — yield_item already does take &mut self, meaning that the join! example woudn’t even compile. In that case there’s much less of a problem, the only issue can arise when cloning the Stream variable:
let stream_2 = stream.clone();
join!(
async { stream.yield_item(1).await; },
async { stream_2.yield_item(2).await; },
);
and this code would panic. So, we could remove the Clone implementation — I kept it in there just because I could, I don’t really know how useful it’d be.
Another option is, as you said, to have yield_item take &self and yield if it can’t give the item immediately. It boils down to whether we want to support code like that (yielding in multiple join branches). With the current macro it’s not possible, so maybe it’s fine to keep it that way.
This is so much more user friendly. I would love to see this get merged! I don't know what the next steps should be but maybe you could resolve the conflicts please?
Would love to see this added.
I don't know if this would be possible but it seems like it's not Send:
error: future cannot be sent between threads safely
--> backend-rust/src/bin/server/s_setup.rs:180:24
|
180 | Ok(StreamBody::new(stream).into_response())
| ^^^^^^ future created by async block is not `Send`
|
= help: within `impl FusedStream + futures_util::Stream<Item = Result<axum::body::Bytes, anyhow::Error>>`, the trait `std::marker::Send` is not implemented for `NonNull<()>`
note: future is not `Send` as this value is used across an await
--> /home/moritz/.cargo/git/checkouts/async-stream.rs-d36ba58420dc21e4/d5e01e5/async-stream/src/functions.rs:313:28
|
300 | let mut option_stream = CURRENT_CONTAINING_STREAM.with(Cell::get);
| ----------------- has type `std::option::Option<async_stream::functions::ContainingStream>` which is not `Send`
...
313 | PendOnce::default().await;
| ^^^^^^ await occurs here, with `mut option_stream` maybe used later
314 | }
| - `mut option_stream` is later dropped here
note: required by a bound in `StreamBody::<S>::new`
--> /home/moritz/.cargo/registry/src/github.com-1ecc6299db9ec823/axum-0.6.0/src/body/stream_body.rs:78:24
|
78 | S: TryStream + Send + 'static,
| ^^^^ required by this bound in `StreamBody::<S>::new`
You’re right — I have now pushed a fix and tests for that.
Is there interest at all by the maintainers to integrate this? If so what would the steps be? Maybe check/reduce breakage of existing code and document an upgrade path? (Also merge conflicts again)
I looked over the code and it didn't look too bad. There are a few things I'm not familiar with though like manual future implementations.
Maybe the macro feature should be enabled by default to aid in compatiblity?
Another upside of this approach:
Unlike yield, allows defining of a custom Ext trait. E.g. yield Message::new("content").boxed() can be stream.message("content").
I suggest a shorter name stream.yield_(expr), since we already know the expression is an item of the stream.
Note that this following code is incorrect:
Case I
tokio::task(async move {
stream.yield_item(1).await;
});
Case II
let _ = || async move {
stream.yield_item(1).await;
};
Case III
let mut _stream = None;
let s = stream(|stream| {
_stream = Some(stream);
async {}
});
And much more...
Solution
Instead of move ing s into async block, Just return it.
stream|mut s| async {
s.yield_(42).await;
return (s, "42");
});
I create an alternative to this crate called async-gen, which supports these feature.