futures-rs
futures-rs copied to clipboard
Buffered stream losing `Send` marker
Hi,
I hit a higher-ranked lifetime error when using buffered streams, where the compiler complains that it cannot prove the future consuming the stream are Send, despite the stream itself being Send.
use futures::stream::{empty, StreamExt};
use std::future::ready;
fn send<T: Send>(_: T) {}
fn main() {
send(async {
empty().map(ready::<&()>).buffered(0).next().await
});
}
Resulting error:
Compiling playground v0.0.1 (/playground)
error: higher-ranked lifetime error
--> src/main.rs:7:5
|
7 | / send(async {
8 | | empty().map(ready::<&()>).buffered(0).next().await
9 | | });
| |______^
|
= note: could not prove for<'r> impl futures::Future<Output = Option<&'r ()>>: std::marker::Send
error: could not compile `playground` due to previous error
Buffered is the key here, as dropping it to use .then() instead of .map() + buffered() compiles fine. This shows that the stream and its future is definitely send, and the buffering makes it weird. I cannot see which bound in the Buffered struct/future makes the resulting stream !Send.
use futures::stream::{empty, StreamExt};
use std::future::ready;
fn send<T: Send>(_: T) {}
fn main() {
send(async {
empty().then(ready::<&()>).next().await
});
}
Obviously, I need buffered for parallelism of the futures, as they do some long-ish network calls; and the Send requirement comes from Tonic's usage of async_trait that requires futures to be Send.
Thanks!
Hmm, if the stream, future, and output are Send, buffered should also be Send.
https://github.com/rust-lang/futures-rs/blob/183f8c61371d4c3891683ca51c72e579891bfa79/futures/tests/auto_traits.rs#L1117-L1120
Yeah, hence why I completely fail to see how the compiler gets confused here, and why some combinations do work while some don't. Do you think it could be a rustc bug?
Can be worked around with:
use futures::stream::{empty, StreamExt};
use std::future::ready;
use std::sync::Arc;
fn send<T: Send>(_: T) {}
fn main() {
// send(async { empty().map(ready::<&()>).buffered(0).next().await });
let sem = Arc::new(tokio::sync::Semaphore::new(5));
send(async {
empty()
.map(ready::<&()>)
.map(|x| {
let sem = sem.clone();
async move {
// This must be `_permit` not just `_` otherwise the object is destroyed immediately.
// https://users.rust-lang.org/t/tokio-semaphore-mystery-acquire-vs-aquire-owned/79646/3
let _permit = sem.acquire().await.unwrap();
x
}
})
.next()
.await
});
println!("done");
}
https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=19ec390a8091772f8378597fa03ee038
I ran into this issue today. I won't post a code example because it's not materially different than the one that @Tuetuopay posted, but I should point out that the same issue exists for buffer_unordered as well as buffered.
It does seem like a rustc bug or at least a limitation of the current compiler, especially given the "could not prove" error message and the fact that it's not giving a concrete reason why the future is not Send.
Another workaround is if you call .boxed() on the stream before calling buffered on it:
use futures::stream::{empty, StreamExt};
use std::future::ready;
fn send<T: Send>(_: T) {}
fn main() {
send(async {
empty().map(ready::<&()>).boxed().buffered(0).next().await
});
}
https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=9c086487d1ff76b309796efa45846f67
@mikeyhew wait what? that actually works, including in my actual code that's much more complex than the example. thank you so much!
With this it's more and more looks like a rustc bug, because the Box<_> version of the stream will have literally written in its type that it's Send.
I just finished debugging a bunch of weird errors that ultimately was caused by this issue again — I must have forgotten to add the .boxed() workaround, and when I added it the errors went away — so I'm going to open an issue on https://github.com/rust-lang/rust referencing this, since it's gotta be a bug in rustc.
Closing in favor of the upstream issue.
Thanks all for the investigation!