async-stream
async-stream copied to clipboard
Can't use ? if try_stream! wraps select!
I have multiple streams I am joining in a select along with a timeout future that caused me to run into this. I tried to boil it down to a minimal code sample that can trigger the problem. It as follows:
use std::io;
use std::future::Future;
use socket2::{Socket, Domain, Type, Protocol};
use futures::Stream;
use async_stream::try_stream;
use futures::select;
use tokio::time::sleep;
use std::time::Duration;
use futures::FutureExt;
use futures::StreamExt;
fn await_results(
mut stream1: impl Stream<Item = io::Result<i32>> + Unpin,
) -> impl Stream<Item = io::Result<i32>> {
try_stream! {
let mut stream1_future = stream1.next().fuse();
select!{
result = stream1_future => {
stream1_future = stream1.next().fuse();
match result {
Some(x) => {
let y = x?;
yield y;
}
None => {
}
}
}
}
}
}
#[tokio::main]
async fn main() -> io::Result<()> {
Ok(())
}
This fails with the error:
error[E0277]: the `?` operator can only be used in an async block that returns `Result` or `Option` (or another type that implements `Try`)
--> src/main.rs:22:33
|
15 | / try_stream! {
16 | | let mut stream1_future = stream1.next().fuse();
17 | | select!{
18 | | result = stream1_future => {
... |
22 | | let y = x?;
| | ^^ cannot use the `?` operator in an async block that returns `()`
... |
30 | | }
31 | | }
| |_____- this function should return `Result` or `Option` to accept `?`
|
= help: the trait `Try` is not implemented for `()`
= note: required by `from_error`
And I confirmed I'm on 0.3.2 so I have the fix for #27
I'm able to easily work around this by using stream! rather than try_stream. So far it looks like I can still get my job done just without the niceness of ?
This seems due to that visit_token_stream_impl only visits yield
but not ?
.
https://github.com/tokio-rs/async-stream/blob/8bf33a6d2cf00257a9235a14ae264aa5c74380ba/async-stream-impl/src/lib.rs#L45-L94
Just bumped into this. I'm using latest v0.3.5.