actix-web icon indicating copy to clipboard operation
actix-web copied to clipboard

Tokio timeout doesn't wake up when streaming while another tokio thread is running

Open SeseMueller opened this issue 1 year ago • 2 comments

Expected Behavior

The tokio timeout and sleep should work and trigger after the supplied time.

Current Behavior

They are "blocked" by another tokio thread executing and do not trigger before that thread is done.

Steps to Reproduce (for bugs)

I created two examples; one where the problem occurs when using actix_web, and one where it doesn't occur when using tokio on very similar code. See Context for explanation.

Tokio example: working code

use futures_util::{Stream, StreamExt};
use std::convert::Infallible;
use std::{process::Command, time::Duration};
use tokio::pin;
use tokio::sync::mpsc::Receiver;

use futures_util::stream::unfold;
use tokio::sync::mpsc::Sender;
use tokio::time::timeout;

#[tokio::main]
async fn main() {
    let stream = tokio::spawn(serve()).await;
    let stream = stream.unwrap();
    pin!(stream);
    loop {
        let item = stream.next().await;
        dbg!(item);
    }
}
// Whether this is async or not doesn't make a difference; just to make them more similar
async fn serve() -> impl Stream<Item = Result<String, Infallible>> {
    let c = |state: Option<Receiver<String>>| async move {
        let mut rx = if state.is_none() {
            let (tx, rx) = tokio::sync::mpsc::channel(100); // Construct a new reciever if it's not
                                                            // initialized yet
            println!("created channel");
            tokio::spawn(do_work(tx));
            println!("spawned work");
            rx
        } else {
            state.unwrap()
        };
        // vvv Problem line: timeout doesn't happen; the command is instead being waited on
        let t = timeout(Duration::from_secs(1), rx.recv()).await;
        // Tokio select! and Tokio sleep also don't work here. 
        dbg!(&t);
        if t == Ok(None) {
            // Stop the stream if the command ended
            return None;
        }
        Some((
            // Send a message on timeout
            Ok::<String, Infallible>("Stream fragment\n".to_owned()),
            Some(rx),
        ))
    };
    let stream = unfold(None, c); // Construct stream from closure
    stream
}

async fn do_work(tx: Sender<String>) {
    println!("Running Command");
    let _ = Command::new("sh").arg("-c").arg("sleep 10").output();
    println!("Command finished!");
    if let Err(e) = tx.send("".to_owned()).await {
        eprintln!("Sending errored! {:?}", e);
    }
}
// Expected Output: ten or nine lines of "Stream Fragment", and one is sent every second; followed by a panic! because the stream was polled after it ended
// Actual output: as expected

Actix_web example: Has the problem

use actix_web::body::MessageBody;
use actix_web::web::Bytes;
use actix_web::{services, web, HttpResponse, Responder};
use actix_web::{App, HttpServer};
use std::convert::Infallible;
use std::{process::Command, time::Duration};
use tokio::sync::mpsc::Receiver;

use futures_util::stream::unfold;
use tokio::sync::mpsc::Sender;
use tokio::time::timeout;

#[tokio::main]
async fn main() -> std::io::Result<()> {
    HttpServer::new(|| {
        let services = services![web::scope("/a").route("/b", web::get().to(serve))]; // Available
                                                                                      // at localhost:8080/a/b
        App::new().service(services)
    })
    .bind(("localhost", 8080))
    .unwrap()
    .run()
    .await
}
async fn serve() -> impl Responder {
    let c = |state: Option<Receiver<String>>| async move {
        let mut rx = if state.is_none() {
            let (tx, rx) = tokio::sync::mpsc::channel(100); // Construct a new reciever if it's not
                                                            // initialized yet
            println!("created channel");
            //tokio::spawn(do_work(tx));
            actix_web::rt::spawn(do_work(tx)); // Both have the same problem
            println!("spawned work");
            rx
        } else {
            state.unwrap()
        };
        // vvv Problem line: timeout doesn't happen; the command is instead being waited on
        let t = timeout(Duration::from_secs(1), rx.recv()).await;
        // Tokio select! and Tokio sleep also don't work here. 
        dbg!(&t);
        if t == Ok(None) {
            // Stop the stream if the command ended
            return None;
        }
        Some((
            // Send a message on timeout
            Ok::<Bytes, Infallible>("Stream fragment\n".to_owned().try_into_bytes().unwrap()),
            Some(rx),
        ))
    };
    let stream = unfold(None, c); // Construct stream from closure
    HttpResponse::Ok().streaming(stream) // Stream out the answer
}
async fn do_work(tx: Sender<String>) {
    println!("Running Command");
    let _ = Command::new("sh").arg("-c").arg("sleep 10").output();
    println!("Command finished!");
    if let Err(e) = tx.send("".to_owned()).await {
        eprintln!("Sending errored! {:?}", e);
    }
}
// Expected Output: ten or nine lines of "Stream Fragment", and one is sent every second
// Actual output: a single line of "Stream Fragment" is output at the very end.

Edit: Using async_process::Command and then awaiting instead of using std::process::Command does solve this particular case.

Context

I am writing a REST API in actix_web that is heavily relying on streams to the client.

While I was adding a heartbeat so that the connection to the client wouldn't time out and the client could be sure that the server was still alive, I came across the issue that some tokio functionality like sleep, select! (with sleep) and timeout don't properly work.

I managed to reduce it to the two examples above.

In both cases, a channel is spawned inside an async closure, which sender is given to a new tokio task that takes long to finish. If the Receiver is then awaited using a tokio timeout (or a tokio sleep is called), the timeout (or sleep) doesn't trigger after the given time, but only after the Receiver is available.

In the context I used it, the async closure was then unfolded into a stream, which is then served to a client. This causes the stream to become "stuck" and not send a heartbeat, until the Receiver can recieve.

In the tokio example, the stream is instead given to the main function, pinned, and iterated through until completion. Here, the stream does not become "stuck" and instead sends a heartbeat once every second, as expected.

(I put this issue on actix-web because it seems to happen because of the way streaming is handeled and because the tokio example works)

Your Environment

The given examples work on fresh install.

  • Rust Version (I.e, output of rustc -V): rustc 1.81.0 (eeb90cda1 2024-09-04)
  • Actix Web Version: 4.9.0

SeseMueller avatar Oct 24 '24 12:10 SeseMueller