tokio
tokio copied to clipboard
Tokio in current_thread not releasing open file handles once the limit is reached.
Version
cargo tree | grep tokio
└── tokio v1.19.2
└── tokio-macros v1.8.0 (proc-macro)
Platform
The output of uname -a
(UNIX), or version and 32 or 64-bit (Windows)
Linux pdebian 5.10.0-10-amd64 #1 SMP Debian 5.10.84-1 (2021-12-08) x86_64 GNU/Linux
Description
While reading from a Unix Socket over a buffered stream, the tokio executor in current_thread doesn't release open files once the open file OS limit is hit and will not accept any more incoming requests, but if the number of open files stays within the OS limit, the file handles are released.
If the executor is switched to a multi_threaded one, this issue doesn't happen. Although the open file limits are hit, it frees the openfiles and the program continues to work.
I am not sure if I am using the BufStream wrong or I might have overlooked something.
I tried this code:
A minimal version of the server code that accepts connection over a unix socket and prints it
src/main.rs
use anyhow::{Context, Result};
use clap::{App, Arg};
use env_logger::{Builder, Env};
use log::{error, info};
use tokio::{
io::{AsyncBufReadExt, AsyncWriteExt, BufStream},
net::{UnixListener, UnixStream},
runtime,
// time::{timeout, Duration},
};
pub static SOCK_PATH: &str = "/var/run/sock.sock";
fn main() -> Result<()> {
Builder::from_env(Env::default().default_filter_or("info")).init();
let clap_app = App::new("openfiles")
.version(env!("CARGO_PKG_VERSION"))
.author(env!("CARGO_PKG_AUTHORS"))
.about("checking tokio openfiles")
.arg(
Arg::new("worker-threads")
.long("worker-threads")
.takes_value(true)
.help("number of worker threads. 0 = current_thread. >0 = worker_threads")
.default_value("1")
.global(true),
)
.get_matches();
let threads = clap_app
.value_of("worker-threads")
.unwrap()
.parse::<usize>()
.unwrap();
let rt = match threads {
0 => {
info!("running in current thread");
runtime::Builder::new_current_thread()
.enable_all()
.build()
.context("cannot create runtime")?
}
multi => {
info!("worker_threads: {}", multi);
runtime::Builder::new_multi_thread()
.worker_threads(multi)
.enable_all()
.thread_name("foobar")
.build()
.context("cannot create runtime")?
}
};
let handle = rt.handle();
let _enter_guard = handle.enter();
let _ = std::fs::remove_file(SOCK_PATH);
let listener = UnixListener::bind(SOCK_PATH).unwrap();
rt.block_on(async move { run_listener(listener).await });
Ok(())
}
pub async fn run_listener(listener: UnixListener) {
loop {
match listener.accept().await {
Ok((stream, _)) => {
info!("Received incoming");
tokio::task::spawn(async move {
match handle_client(stream).await {
Ok(_) => (),
Err(err) => error!("error handling client, error: {}", err),
}
});
}
Err(err) => {
error!("error accepting connection, error: {}", err);
}
}
}
}
async fn handle_client(stream: UnixStream) -> Result<()> {
let mut buf_stream = BufStream::new(stream);
let mut line = String::new();
buf_stream.read_line(&mut line).await?;
info!("Received request: {}", line);
buf_stream.write_all(b"END\r\n").await?;
buf_stream.shutdown().await?;
drop(buf_stream);
Ok(())
}
Client code that generates parallel requests
src/client/main.rs
use anyhow::{Context, Result};
use tokio::{
io::{AsyncWriteExt, BufStream},
net::UnixStream,
runtime,
};
pub static SOCK_PATH: &str = "/var/run/sock.sock";
fn main() -> Result<()> {
let rt = runtime::Builder::new_multi_thread()
.enable_all()
.thread_name("resolver-core")
.build()
.context("cannot create runtime")?;
rt.block_on(run_client())?;
Ok(())
}
async fn run_client() -> Result<()> {
loop {
let listener = UnixStream::connect(SOCK_PATH).await?;
let mut buf_stream = BufStream::new(listener);
tokio::spawn(async move {
match buf_stream.write_all(b"foobar\r\n").await {
Ok(_) => (),
Err(err) => {
println!("write_all error:: {}", err);
}
};
match buf_stream.flush().await {
Ok(_) => (),
Err(err) => {
println!("flush error:: {}", err);
}
};
});
}
}
cargo.toml
[package]
name = "foobar"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[[bin]]
name = "server"
path = "src/main.rs"
[[bin]]
name = "client"
path = "src/client/main.rs"
[dependencies]
tokio = { version = "1.15.0", features = ["full"] }
clap = "3.0.13"
anyhow = "1.0.54"
log = "0.4.17"
env_logger = "0.9.0"
I expected to see this happen:
When the server is run as
./server --worker-threads=0
and when the client is run as
./client
I expect to see
[2022-06-21T19:55:57Z ERROR server] error accepting connection, error: Too many open files (os error 24)
from the server's stdout once the ulimit
to openfiles are hit, but I also expect it to recover after the client is stopped.
Instead, this happened:
With the above code, the server keeps on printing
[2022-06-21T19:55:57Z ERROR server] error accepting connection, error: Too many open files (os error 24)
even after the client is exited. I watched the output of lsof
for some time and it was as below
sudo lsof -p 2704488 | grep -ic sock
1015
The number of open file handles to the socket never comes down because it was never released.
Note:
This happens only when running in current_thread
. If the executer is switched to multi_threaded
by running the server as
./server --worker-threads=1
, even if the server hits open file limit, it recovers and lsof
output shows the number of open filehandles to the socket coming down.
I tried to reproduce this in a docker running on my mac, but it didn't occur. I tried running this on baremetal linux and linux running on vmware fusion and I was able to reproduce this.
I have this code added into my repo if anybody want to try it locally on a linux machine. (https://github.com/nohupped/buggy)
@nohupped this reproduces more reliably:
bin/server.rs
use std::fs::remove_file;
use tokio::net::{UnixListener, UnixStream};
use ulimit_echo::SOCK;
#[tokio::main(flavor = "current_thread")]
async fn main() {
let _ = remove_file(SOCK);
let sock = UnixListener::bind(SOCK).unwrap();
loop {
match sock.accept().await {
Ok((stream, _)) => {
tokio::spawn(handle_stream(stream));
},
Err(e) => eprintln!("Failed to accept: {:?}", e),
}
}
}
async fn handle_stream(mut stream: UnixStream) {
let (mut rx, mut tx) = stream.split();
tokio::io::copy(&mut rx, &mut tx).await.unwrap();
}
bin/client.rs
use tokio::io::AsyncWriteExt;
use tokio::net::UnixStream;
use ulimit_echo::SOCK;
#[tokio::main(flavor = "current_thread")]
async fn main() {
loop {
match UnixStream::connect(SOCK).await {
Ok(stream) => {
tokio::spawn(handle_stream(stream));
}
Err(e) => eprintln!("Error in creating stream: {:?}", e),
}
}
}
async fn handle_stream(mut stream: UnixStream) {
stream.write_all(b"UNLIMITED FILES!!!").await.unwrap();
let (mut rx, mut tx) = stream.split();
tokio::io::copy(&mut rx, &mut tx).await.unwrap();
}
lib.rs
pub const SOCK: &str = "sock";
Adding tokio::task::yield_now().await
to the error branch in the server fixes it. It appears that we are busy looping in the listener task, preventing the connections from closing properly.
Might be a budgeting issue?
I'll look at this more later tonight.
Using the unstable tokio::task::consume_budget
also fixes this.
Yes, this does appear to be due to connect
not consuming budget when it returns an error. Though in a production application you should probably use a limit on the number of connections (probably a Semaphore
) to avoid running out of fds in the first place.
@Darksonn I think the issue is in accept
, not connect.
Right, good point. That was a typo.
@nohupped workaround for now is to use yield_now
when accept
returns that error. If you are using --cfg tokio-unstable
, you also have the option of consume_budget
.
More generally though, you probably don't want to be hot-looping over accept errors. Sleeping for a bit of time after an error comes out of accept would keep the CPU from needlessly getting slammed.
@sfackler I agree completely especially with a ulimit issue, however I do think that it is problematic that we just busy-loop here.
@nohupped workaround for now is to use
yield_now
whenaccept
returns that error. If you are using--cfg tokio-unstable
, you also have the option ofconsume_budget
.
Thank you. I tried the yield_now, and that's working for me.
More generally though, you probably don't want to be hot-looping over accept errors. Sleeping for a bit of time after an error comes out of accept would keep the CPU from needlessly getting slammed.
Thank you. I'm planning to add that. Before using a match
, I was using an if let Ok((stream, _)) = listener.accept().await
, so with that, it was busy looping by default.