tokio icon indicating copy to clipboard operation
tokio copied to clipboard

Tokio in current_thread not releasing open file handles once the limit is reached.

Open nohupped opened this issue 2 years ago • 12 comments

Version

cargo tree | grep tokio

└── tokio v1.19.2
    └── tokio-macros v1.8.0 (proc-macro)

Platform The output of uname -a (UNIX), or version and 32 or 64-bit (Windows)

Linux pdebian 5.10.0-10-amd64 #1 SMP Debian 5.10.84-1 (2021-12-08) x86_64 GNU/Linux

Description

While reading from a Unix Socket over a buffered stream, the tokio executor in current_thread doesn't release open files once the open file OS limit is hit and will not accept any more incoming requests, but if the number of open files stays within the OS limit, the file handles are released.

If the executor is switched to a multi_threaded one, this issue doesn't happen. Although the open file limits are hit, it frees the openfiles and the program continues to work.

I am not sure if I am using the BufStream wrong or I might have overlooked something.

I tried this code:

A minimal version of the server code that accepts connection over a unix socket and prints it

src/main.rs

use anyhow::{Context, Result};
use clap::{App, Arg};
use env_logger::{Builder, Env};
use log::{error, info};
use tokio::{
    io::{AsyncBufReadExt, AsyncWriteExt, BufStream},
    net::{UnixListener, UnixStream},
    runtime,
    // time::{timeout, Duration},
};

pub static SOCK_PATH: &str = "/var/run/sock.sock";

fn main() -> Result<()> {
    Builder::from_env(Env::default().default_filter_or("info")).init();
    let clap_app = App::new("openfiles")
        .version(env!("CARGO_PKG_VERSION"))
        .author(env!("CARGO_PKG_AUTHORS"))
        .about("checking tokio openfiles")
        .arg(
            Arg::new("worker-threads")
                .long("worker-threads")
                .takes_value(true)
                .help("number of worker threads. 0 = current_thread. >0 = worker_threads")
                .default_value("1")
                .global(true),
        )
        .get_matches();
    let threads = clap_app
        .value_of("worker-threads")
        .unwrap()
        .parse::<usize>()
        .unwrap();
    let rt = match threads {
        0 => {
            info!("running in current thread");
            runtime::Builder::new_current_thread()
                .enable_all()
                .build()
                .context("cannot create runtime")?
        }
        multi => {
            info!("worker_threads: {}", multi);
            runtime::Builder::new_multi_thread()
                .worker_threads(multi)
                .enable_all()
                .thread_name("foobar")
                .build()
                .context("cannot create runtime")?
        }
    };

    let handle = rt.handle();
    let _enter_guard = handle.enter();
    let _ = std::fs::remove_file(SOCK_PATH);
    let listener = UnixListener::bind(SOCK_PATH).unwrap();
    rt.block_on(async move { run_listener(listener).await });
    Ok(())
}

pub async fn run_listener(listener: UnixListener) {
    loop {
        match listener.accept().await {
            Ok((stream, _)) => {
                info!("Received incoming");
                tokio::task::spawn(async move {
                    match handle_client(stream).await {
                        Ok(_) => (),
                        Err(err) => error!("error handling client, error: {}", err),
                    }
                });
            }
            Err(err) => {
                error!("error accepting connection, error: {}", err);
            }
        }
    }
}

async fn handle_client(stream: UnixStream) -> Result<()> {
    let mut buf_stream = BufStream::new(stream);
    let mut line = String::new();
    buf_stream.read_line(&mut line).await?;

    info!("Received request: {}", line);
    buf_stream.write_all(b"END\r\n").await?;
    buf_stream.shutdown().await?;
    drop(buf_stream);
    Ok(())
}

Client code that generates parallel requests

src/client/main.rs

use anyhow::{Context, Result};
use tokio::{
    io::{AsyncWriteExt, BufStream},
    net::UnixStream,
    runtime,
};

pub static SOCK_PATH: &str = "/var/run/sock.sock";

fn main() -> Result<()> {
    let rt = runtime::Builder::new_multi_thread()
        .enable_all()
        .thread_name("resolver-core")
        .build()
        .context("cannot create runtime")?;
    rt.block_on(run_client())?;

    Ok(())
}

async fn run_client() -> Result<()> {
    loop {
        let listener = UnixStream::connect(SOCK_PATH).await?;
        let mut buf_stream = BufStream::new(listener);
        tokio::spawn(async move {
            match buf_stream.write_all(b"foobar\r\n").await {
                Ok(_) => (),
                Err(err) => {
                    println!("write_all error:: {}", err);
                }
            };

            match buf_stream.flush().await {
                Ok(_) => (),
                Err(err) => {
                    println!("flush error:: {}", err);
                }
            };
        });
    }
}

cargo.toml

[package]
name = "foobar"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[[bin]]
name = "server"
path = "src/main.rs"

[[bin]]
name = "client"
path = "src/client/main.rs"

[dependencies]
tokio = { version = "1.15.0", features = ["full"] }
clap = "3.0.13"
anyhow = "1.0.54"
log = "0.4.17"
env_logger = "0.9.0"

I expected to see this happen:

When the server is run as

./server --worker-threads=0

and when the client is run as

./client

I expect to see

[2022-06-21T19:55:57Z ERROR server] error accepting connection, error: Too many open files (os error 24)

from the server's stdout once the ulimit to openfiles are hit, but I also expect it to recover after the client is stopped.

Instead, this happened:

With the above code, the server keeps on printing

[2022-06-21T19:55:57Z ERROR server] error accepting connection, error: Too many open files (os error 24)

even after the client is exited. I watched the output of lsof for some time and it was as below

sudo lsof -p 2704488 | grep -ic sock
1015

The number of open file handles to the socket never comes down because it was never released.

Note:

This happens only when running in current_thread. If the executer is switched to multi_threaded by running the server as ./server --worker-threads=1, even if the server hits open file limit, it recovers and lsof output shows the number of open filehandles to the socket coming down. I tried to reproduce this in a docker running on my mac, but it didn't occur. I tried running this on baremetal linux and linux running on vmware fusion and I was able to reproduce this. I have this code added into my repo if anybody want to try it locally on a linux machine. (https://github.com/nohupped/buggy)

nohupped avatar Jun 21 '22 20:06 nohupped

@nohupped this reproduces more reliably: bin/server.rs

use std::fs::remove_file;
use tokio::net::{UnixListener, UnixStream};
use ulimit_echo::SOCK;

#[tokio::main(flavor = "current_thread")]
async fn main() {
    let _ = remove_file(SOCK);

    let sock = UnixListener::bind(SOCK).unwrap();

    loop {
        match sock.accept().await {
            Ok((stream, _)) => {
                tokio::spawn(handle_stream(stream));
            },
            Err(e) => eprintln!("Failed to accept: {:?}", e),
        }
    }
}

async fn handle_stream(mut stream: UnixStream) {
    let (mut rx, mut tx) = stream.split();
    tokio::io::copy(&mut rx, &mut tx).await.unwrap();
}

bin/client.rs

use tokio::io::AsyncWriteExt;
use tokio::net::UnixStream;
use ulimit_echo::SOCK;

#[tokio::main(flavor = "current_thread")]
async fn main() {
    loop {
        match UnixStream::connect(SOCK).await {
            Ok(stream) => {
                tokio::spawn(handle_stream(stream));
            }
            Err(e) => eprintln!("Error in creating stream: {:?}", e),
        }
    }
}

async fn handle_stream(mut stream: UnixStream) {
    stream.write_all(b"UNLIMITED FILES!!!").await.unwrap();

    let (mut rx, mut tx) = stream.split();
    tokio::io::copy(&mut rx, &mut tx).await.unwrap();
}

lib.rs

pub const SOCK: &str = "sock";

Noah-Kennedy avatar Jun 21 '22 22:06 Noah-Kennedy

Adding tokio::task::yield_now().await to the error branch in the server fixes it. It appears that we are busy looping in the listener task, preventing the connections from closing properly.

Noah-Kennedy avatar Jun 21 '22 22:06 Noah-Kennedy

Might be a budgeting issue?

Noah-Kennedy avatar Jun 21 '22 22:06 Noah-Kennedy

I'll look at this more later tonight.

Noah-Kennedy avatar Jun 21 '22 23:06 Noah-Kennedy

Using the unstable tokio::task::consume_budget also fixes this.

Noah-Kennedy avatar Jun 22 '22 03:06 Noah-Kennedy

Yes, this does appear to be due to connect not consuming budget when it returns an error. Though in a production application you should probably use a limit on the number of connections (probably a Semaphore) to avoid running out of fds in the first place.

Darksonn avatar Jun 22 '22 06:06 Darksonn

@Darksonn I think the issue is in accept, not connect.

Noah-Kennedy avatar Jun 22 '22 06:06 Noah-Kennedy

Right, good point. That was a typo.

Darksonn avatar Jun 22 '22 06:06 Darksonn

@nohupped workaround for now is to use yield_now when accept returns that error. If you are using --cfg tokio-unstable, you also have the option of consume_budget.

Noah-Kennedy avatar Jun 22 '22 15:06 Noah-Kennedy

More generally though, you probably don't want to be hot-looping over accept errors. Sleeping for a bit of time after an error comes out of accept would keep the CPU from needlessly getting slammed.

sfackler avatar Jun 22 '22 15:06 sfackler

@sfackler I agree completely especially with a ulimit issue, however I do think that it is problematic that we just busy-loop here.

Noah-Kennedy avatar Jun 22 '22 15:06 Noah-Kennedy

@nohupped workaround for now is to use yield_now when accept returns that error. If you are using --cfg tokio-unstable, you also have the option of consume_budget.

Thank you. I tried the yield_now, and that's working for me.

More generally though, you probably don't want to be hot-looping over accept errors. Sleeping for a bit of time after an error comes out of accept would keep the CPU from needlessly getting slammed.

Thank you. I'm planning to add that. Before using a match, I was using an if let Ok((stream, _)) = listener.accept().await, so with that, it was busy looping by default.

nohupped avatar Jun 22 '22 15:06 nohupped