futures-rs icon indicating copy to clipboard operation
futures-rs copied to clipboard

Buffered stream losing `Send` marker

Open Tuetuopay opened this issue 3 years ago • 5 comments

Hi,

I hit a higher-ranked lifetime error when using buffered streams, where the compiler complains that it cannot prove the future consuming the stream are Send, despite the stream itself being Send.

use futures::stream::{empty, StreamExt};
use std::future::ready;

fn send<T: Send>(_: T) {}

fn main() {
    send(async {
        empty().map(ready::<&()>).buffered(0).next().await
    });
}

Resulting error:

   Compiling playground v0.0.1 (/playground)
error: higher-ranked lifetime error
 --> src/main.rs:7:5
  |
7 | /     send(async {
8 | |         empty().map(ready::<&()>).buffered(0).next().await
9 | |     });
  | |______^
  |
  = note: could not prove for<'r> impl futures::Future<Output = Option<&'r ()>>: std::marker::Send

error: could not compile `playground` due to previous error

Buffered is the key here, as dropping it to use .then() instead of .map() + buffered() compiles fine. This shows that the stream and its future is definitely send, and the buffering makes it weird. I cannot see which bound in the Buffered struct/future makes the resulting stream !Send.

use futures::stream::{empty, StreamExt};
use std::future::ready;

fn send<T: Send>(_: T) {}

fn main() {
    send(async {
        empty().then(ready::<&()>).next().await
    });
}

Obviously, I need buffered for parallelism of the futures, as they do some long-ish network calls; and the Send requirement comes from Tonic's usage of async_trait that requires futures to be Send.

Thanks!

Tuetuopay avatar Aug 16 '22 10:08 Tuetuopay

Hmm, if the stream, future, and output are Send, buffered should also be Send.

https://github.com/rust-lang/futures-rs/blob/183f8c61371d4c3891683ca51c72e579891bfa79/futures/tests/auto_traits.rs#L1117-L1120

taiki-e avatar Aug 16 '22 11:08 taiki-e

Yeah, hence why I completely fail to see how the compiler gets confused here, and why some combinations do work while some don't. Do you think it could be a rustc bug?

Tuetuopay avatar Aug 16 '22 16:08 Tuetuopay

Can be worked around with:

use futures::stream::{empty, StreamExt};
use std::future::ready;
use std::sync::Arc;

fn send<T: Send>(_: T) {}

fn main() {
    // send(async { empty().map(ready::<&()>).buffered(0).next().await });
    let sem = Arc::new(tokio::sync::Semaphore::new(5));
    send(async {
        empty()
            .map(ready::<&()>)
            .map(|x| {
                let sem = sem.clone();
                async move {
                    // This must be `_permit` not just `_` otherwise the object is destroyed immediately.
                    // https://users.rust-lang.org/t/tokio-semaphore-mystery-acquire-vs-aquire-owned/79646/3
                    let _permit = sem.acquire().await.unwrap();
                    x
                }
            })
            .next()
            .await
    });
    println!("done");
}

https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=19ec390a8091772f8378597fa03ee038

aleb avatar Aug 31 '22 09:08 aleb

I ran into this issue today. I won't post a code example because it's not materially different than the one that @Tuetuopay posted, but I should point out that the same issue exists for buffer_unordered as well as buffered.

It does seem like a rustc bug or at least a limitation of the current compiler, especially given the "could not prove" error message and the fact that it's not giving a concrete reason why the future is not Send.

Another workaround is if you call .boxed() on the stream before calling buffered on it:

use futures::stream::{empty, StreamExt};
use std::future::ready;

fn send<T: Send>(_: T) {}

fn main() {
    send(async {
        empty().map(ready::<&()>).boxed().buffered(0).next().await
    });
}

https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=9c086487d1ff76b309796efa45846f67

mikeyhew avatar Sep 11 '22 09:09 mikeyhew

@mikeyhew wait what? that actually works, including in my actual code that's much more complex than the example. thank you so much!

With this it's more and more looks like a rustc bug, because the Box<_> version of the stream will have literally written in its type that it's Send.

Tuetuopay avatar Sep 12 '22 12:09 Tuetuopay

I just finished debugging a bunch of weird errors that ultimately was caused by this issue again — I must have forgotten to add the .boxed() workaround, and when I added it the errors went away — so I'm going to open an issue on https://github.com/rust-lang/rust referencing this, since it's gotta be a bug in rustc.

mikeyhew avatar Nov 13 '22 22:11 mikeyhew

Closing in favor of the upstream issue.

Thanks all for the investigation!

taiki-e avatar Nov 14 '22 01:11 taiki-e