futures-rs
futures-rs copied to clipboard
ForEach, Fold, and similar stream combinators can run saturated without returning from poll
The combinators that poll a stream for exhaustion in a loop have a problem that's already been raised in #869: if the upstream consecutively returns Ready
for a long time, the loop never breaks and the combinator's poll
never returns for that long, starving other pending operations in the task from being polled.
To illustrate how this can be a problem for other code, consider this simple adapter for making futures cancellable:
use futures::channel::oneshot::{self, Canceled};
use futures::prelude::*;
use std::pin::Pin;
use std::task::{Context, Poll};
use futures::ready;
use pin_utils::unsafe_pinned;
struct CancelHandle(oneshot::Sender<()>);
#[derive(Debug)]
struct AlreadyDropped;
impl CancelHandle {
fn cancel(self) -> Result<(), AlreadyDropped> {
self.0.send(()).map_err(|()| AlreadyDropped)
}
}
struct Cancelable<F> {
op: F,
stop_rx: oneshot::Receiver<()>,
}
impl<F> Cancelable<F> {
unsafe_pinned!(op: F);
unsafe_pinned!(stop_rx: oneshot::Receiver<()>);
}
impl<F: Unpin> Unpin for Cancelable<F> {}
impl<F> Future for Cancelable<F>
where
F: Future,
{
type Output = Result<F::Output, Canceled>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.as_mut().stop_rx().poll(cx) {
Poll::Pending => {
let output = ready!(self.as_mut().op().poll(cx));
Ok(output).into()
}
Poll::Ready(_res) => Err(Canceled).into(),
}
}
}
fn make_cancelable<F>(op: F) -> (Cancelable<F>, CancelHandle) {
let (stop_tx, stop_rx) = oneshot::channel();
let fut = Cancelable { op, stop_rx };
let handle = CancelHandle(stop_tx);
(fut, handle)
}
It looks rather useful and intuitive, but this contrived example hangs with a busy-looping thread rather than canceling the task:
fn main() {
let mut a = 0;
let (fut, stop_handle) = make_cancelable(
stream::repeat(1).for_each(move |n| {
a += n;
future::ready(())
})
);
let mut executor = ThreadPool::new().unwrap();
let res_handle = executor.spawn_with_handle(fut).unwrap();
thread::sleep(Duration::from_millis(1));
stop_handle.cancel().unwrap();
let res = executor.run(res_handle);
assert!(res.is_err());
}
In non-contrived usage with real streams, too, a ForEach
with an always-ready processing closure will delay cancellation for as long as the stream yields items.
An easy fix is to add a counter into such polling loops and, upon reaching a certain number of iterations, call an immediate wakeup for the task and return Pending
. To make it tunable, the combinators equipped with loop guards could offer a method:
impl<St, Fut, F> ForEach<St, Fut, F> {
pub fn yield_after_every(mut self, iterations: u32) -> Self {
self.yield_after_every = iterations;
self
}
}
The resolution of https://github.com/rust-lang-nursery/futures-rs/issues/869#issuecomment-548600024 offers making a dedicated stream combinator to force yield after N iterations, but I think it's a poor solution due to human aspect: it's very easy to write complex async code disregarding this, and that will work most of the time until a stream happens to be saturated somewhere that is critical.
For programmers who are sure they don't need any silly busy-looping guards, but rather need that extra performance in polling something that, in principle, is prone to be pending on I/O, some caution-worded alternative combinators can be added: .for_each_uninterrupted(...)
or something like that.
Has this been solved by https://github.com/tokio-rs/tokio/pull/2160?
It's solved only in the context of tokio
I'm afraid, but yes, in that context it should be solved. If we wanted a solution for all executors, we'd need to either wait for the coop
stuff to evolve beyond tokio, or implement something like https://github.com/rust-lang/futures-rs/pull/2049 (which solved https://github.com/rust-lang/futures-rs/issues/2047).
I like the cooperative budget approach outlined in https://github.com/rust-lang/rust/pull/74335#issuecomment-742775411 Is there an RFC for that?