futures-rs icon indicating copy to clipboard operation
futures-rs copied to clipboard

ForEach, Fold, and similar stream combinators can run saturated without returning from poll

Open mzabaluev opened this issue 5 years ago • 6 comments

The combinators that poll a stream for exhaustion in a loop have a problem that's already been raised in #869: if the upstream consecutively returns Ready for a long time, the loop never breaks and the combinator's poll never returns for that long, starving other pending operations in the task from being polled.

To illustrate how this can be a problem for other code, consider this simple adapter for making futures cancellable:

use futures::channel::oneshot::{self, Canceled};
use futures::prelude::*;
use std::pin::Pin;
use std::task::{Context, Poll};

use futures::ready;
use pin_utils::unsafe_pinned;

struct CancelHandle(oneshot::Sender<()>);

#[derive(Debug)]
struct AlreadyDropped;

impl CancelHandle {
    fn cancel(self) -> Result<(), AlreadyDropped> {
        self.0.send(()).map_err(|()| AlreadyDropped)
    }
}

struct Cancelable<F> {
    op: F,
    stop_rx: oneshot::Receiver<()>,
}

impl<F> Cancelable<F> {
    unsafe_pinned!(op: F);
    unsafe_pinned!(stop_rx: oneshot::Receiver<()>);
}

impl<F: Unpin> Unpin for Cancelable<F> {}

impl<F> Future for Cancelable<F>
where
    F: Future,
{
    type Output = Result<F::Output, Canceled>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        match self.as_mut().stop_rx().poll(cx) {
            Poll::Pending => {
                let output = ready!(self.as_mut().op().poll(cx));
                Ok(output).into()
            }
            Poll::Ready(_res) => Err(Canceled).into(),
        }
    }
}

fn make_cancelable<F>(op: F) -> (Cancelable<F>, CancelHandle) {
    let (stop_tx, stop_rx) = oneshot::channel();
    let fut = Cancelable { op, stop_rx };
    let handle = CancelHandle(stop_tx);
    (fut, handle)
}

It looks rather useful and intuitive, but this contrived example hangs with a busy-looping thread rather than canceling the task:

fn main() {
    let mut a = 0;
    let (fut, stop_handle) = make_cancelable(
        stream::repeat(1).for_each(move |n| {
            a += n;
            future::ready(())
        })
    );
    let mut executor = ThreadPool::new().unwrap();
    let res_handle = executor.spawn_with_handle(fut).unwrap();
    thread::sleep(Duration::from_millis(1));
    stop_handle.cancel().unwrap();
    let res = executor.run(res_handle);
    assert!(res.is_err());
}

In non-contrived usage with real streams, too, a ForEach with an always-ready processing closure will delay cancellation for as long as the stream yields items.

mzabaluev avatar Nov 06 '19 18:11 mzabaluev

An easy fix is to add a counter into such polling loops and, upon reaching a certain number of iterations, call an immediate wakeup for the task and return Pending. To make it tunable, the combinators equipped with loop guards could offer a method:

impl<St, Fut, F> ForEach<St, Fut, F> {
    pub fn yield_after_every(mut self, iterations: u32) -> Self {
        self.yield_after_every = iterations;
        self
    }
}

mzabaluev avatar Nov 06 '19 18:11 mzabaluev

The resolution of https://github.com/rust-lang-nursery/futures-rs/issues/869#issuecomment-548600024 offers making a dedicated stream combinator to force yield after N iterations, but I think it's a poor solution due to human aspect: it's very easy to write complex async code disregarding this, and that will work most of the time until a stream happens to be saturated somewhere that is critical.

mzabaluev avatar Nov 06 '19 18:11 mzabaluev

For programmers who are sure they don't need any silly busy-looping guards, but rather need that extra performance in polling something that, in principle, is prone to be pending on I/O, some caution-worded alternative combinators can be added: .for_each_uninterrupted(...) or something like that.

mzabaluev avatar Nov 06 '19 18:11 mzabaluev

Has this been solved by https://github.com/tokio-rs/tokio/pull/2160?

mzabaluev avatar Apr 02 '20 14:04 mzabaluev

It's solved only in the context of tokio I'm afraid, but yes, in that context it should be solved. If we wanted a solution for all executors, we'd need to either wait for the coop stuff to evolve beyond tokio, or implement something like https://github.com/rust-lang/futures-rs/pull/2049 (which solved https://github.com/rust-lang/futures-rs/issues/2047).

jonhoo avatar Apr 02 '20 14:04 jonhoo

I like the cooperative budget approach outlined in https://github.com/rust-lang/rust/pull/74335#issuecomment-742775411 Is there an RFC for that?

mzabaluev avatar Jul 22 '21 06:07 mzabaluev