tokio
tokio copied to clipboard
rt: high tail latencies with threaded scheduler when under load
Given a TCP echo client that:
- Opens 200~1000 connections.
- Sends 8kb of data on each connection.
- Reads the data back.
Running this client on the threaded runtime results in significant tail latencies.
Code
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Instant;
#[tokio::main(core_threads=2)]
async fn main() {
const N: usize = 200;
const STEP: usize = N/10;
const BUFFER: usize = 8*1024;
let addr: SocketAddr = "127.0.0.1:8080".parse().unwrap();
let data = Arc::new(vec![10_u8; BUFFER]);
let mut handles = Vec::with_capacity(N);
let mut elapsed = Vec::with_capacity(N);
for i in 0..N {
if i % 10 == 9 {
tokio::task::yield_now().await;
}
let data = data.clone();
let now = Instant::now();
handles.push(tokio::spawn(async move {
let tts = now.elapsed();
let mut buf = vec![10_u8; BUFFER];
let mut socket = TcpStream::connect(addr).await.unwrap();
socket.write_all(&data).await.unwrap();
socket.read_exact(&mut buf).await.unwrap();
assert_eq!(buf[..], data[..]);
// now.elapsed()
tts
}));
}
for handle in handles.drain(..) {
elapsed.push(handle.await.unwrap());
}
elapsed.sort();
let mut i = STEP;
while i <= N {
println!("{}th = {:?}; tts", i, elapsed[i-1]);
i += STEP;
}
}
Output
Running this against the echo
example results in the following output:
20th = 241.19µs; tts
40th = 476.356µs; tts
60th = 697.187µs; tts
80th = 23.421715ms; tts
100th = 23.651342ms; tts
120th = 23.894434ms; tts
140th = 48.230702ms; tts
160th = 49.305577ms; tts
180th = 50.433882ms; tts
200th = 51.743111ms; tts
Compare this with basic_scheduler
20th = 112.65µs; tts
40th = 129.878µs; tts
60th = 145.256µs; tts
80th = 159.83µs; tts
100th = 176.598µs; tts
120th = 192.344µs; tts
140th = 209.232µs; tts
160th = 223.36µs; tts
180th = 243.565µs; tts
200th = 278.131µs; tts
This behavior is most likely a conflation of a number of factors. In the above example, the primary issue is spawning many tasks from the main function. When using the threaded scheduler, the main function runs outside of the scheduler. Spawned tasks are sent to the scheduler using the injection queue. This injection queue (MPMC) is slower than the scheduler's primary queue (SPMC).
When the scheduler is under load, it heavily prioritizes its local queue. In the above example, the scheduler is under load, so it prioritizes already spawned tasks instead of checking for new tasks. This results in the time to first poll for tasks to be very high.
This behavior can be verified by wrapping the contents of the main
function with a spawn:
#[tokio::main(core_threads=2)]
async fn main() {
tokio::spawn(async {
}).await.unwrap();
}
Doing this changes the output to:
20th = 2.735µs; tts
40th = 41.12µs; tts
60th = 59.978µs; tts
80th = 80.126µs; tts
100th = 100.416µs; tts
120th = 136.696µs; tts
140th = 191.012µs; tts
160th = 244.484µs; tts
180th = 527.24µs; tts
200th = 29.068916ms; tts
And if we increase the number of threads to 8, we get:
20th = 2.007µs; tts
40th = 5.823µs; tts
60th = 18.551µs; tts
80th = 32.356µs; tts
100th = 39.56µs; tts
120th = 51.522µs; tts
140th = 72.05µs; tts
160th = 110.676µs; tts
180th = 160.459µs; tts
200th = 26.270611ms; tts
This is better. Notice how adding threads reduces the latencies compared to basic_scheduler
. However, the maximum latency is surprisingly high (26ms) vs. basic_scheduler
. I have not yet investigated why that is the case.
Fixing
I believe the fix for the injection queue will require:
- Improving the injection queue
- Tweaking heuristics
The current injection queue is fairly naive. It is a linked list guarded with a mutex. One option to consider is switching to an MPSC intrusive channel with a mutex guarding the head. This probably won't do too much. Workers probably want to acquire tasks from the injection queue in batches. Instead of popping one task at a time, when the local queue is not full, grab a bunch of tasks from the injection queue.
Heuristic wise, when the worker is under load, we may want to consider checking the injection queue more often. This may be less necessary if tasks are acquired in batches. If the injection queue does need to be checked more often, one option would be to check the queue every ~5 ticks if the last time the queue was checked there was a task.
The high tail latency after adding the spawn in the main fn is most likely not due to the injection queue. I believe adding the spawn should prevent the injection queue from being used as the total number of tasks is less than the local queue capacity (256). I do not know what is the cause for that behavior yet.
I spent more time investigating this.
I discovered the high latency spike is due to a call to libc::socket
blocking for ~30ms. When creating sockets in a new, multi-threaded, process, the kernel will need to grow its internal FD table.
When running a multi-threaded process, the kernel needs to perform synchronization on the FD table. Synchronization is performed using RCU. In practice, the read lock on the FD table is released after "some period of time" after usage and not immediately after the read is complete. When socket
is called and the FD table needs to grow in order to track the new FD, the thread must wait for all readers to complete in order to grow the table. This is the source of the hang in socket
.
To diagnose, I used the following:
perf trace -e probe:alloc_fdtable,probe:free_fdtable_rcu,socket target/release/tmp-simple-client
[...]
0.965 ( ): tokio-runtime-/69619 probe:alloc_fdtable:(ffffffffa90ff090)
0.957 (28.139 ms): tokio-runtime-/69619 ... [continued]: socket()) = 64
0.978 (28.173 ms): tokio-runtime-/69618 ... [continued]: socket()) = 65
[...]
Additionally, running the test after "warming up" the FD table results in smooth execution:
20th - 2.92µs
40th - 19.048µs
60th - 36.003µs
80th - 49.98µs
100th - 59.227µs
120th - 66.215µs
140th - 81.66µs
160th - 96.405µs
180th - 110.253µs
200th - 291.598µs
"warming up" the FD table is done by dup
ing sockets to create FDs.
FWIW, I think I've been seeing this exact same behaviour when using recvmsg on a Unix domain socket to receive lots of file descriptors (in an unrelated non-Rust project). I was also able to sort it out by using dup2() to force the FD table to be big enough to fit all sockets I'll receive.
I was seeing the same issue regarding alloc_fdtable
slows the thread.
By following @carllerche 's comment, I was able to use dup
to force fd_table resize to a certain size, so to avoid alloc_fdtable
call during the middle and blocking the thread.
The code I'm doing
use std::net::SocketAddr;
use std::os::fd::AsRawFd;
use std::sync::Arc;
use std::time::Instant;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpSocket;
#[tokio::main(worker_threads = 2)]
// #[tokio::main(flavor = "current_thread")]
async fn main() {
// --- trick begins---
// craete a socket and use it with `dup` in order to pre-size the fdtable
let socket = TcpSocket::new_v4().unwrap();
let socket_fd = socket.as_raw_fd();
let mut dup_fd_list = Vec::new();
unsafe {
// `dup`` file descriptors
// NOTE: I notice fd table will resize to its 2 times big when
// its size is 64, 128, 256, etc. Here we call `dup` 512 times, this
// effectively will add 512 entries to the fd_table and triggers
// `alloc_fdtable` 4 times - which will be sufficient for us since
// it will then have 1024 entries in the fd_table.
for _ in 0..512 {
let dup_fd = libc::dup(socket_fd);
dup_fd_list.push(dup_fd);
}
// close all duplicated file descriptor
for fd in dup_fd_list {
libc::close(fd);
}
drop(socket);
}
// --- trick ends ---
tokio::spawn(async {
const N: usize = 500;
const STEP: usize = N / 10;
const BUFFER: usize = 8 * 1024;
let addr: SocketAddr = "127.0.0.1:8080".parse().unwrap();
let data = Arc::new(vec![10_u8; BUFFER]);
let mut handles = Vec::with_capacity(N);
let mut elapsed = Vec::with_capacity(N);
for i in 0..N {
if i % 10 == 9 {
tokio::task::yield_now().await;
}
let data = data.clone();
let now = Instant::now();
handles.push(tokio::spawn(async move {
let tts = now.elapsed();
let mut buf = vec![10_u8; BUFFER];
let soc = TcpSocket::new_v4().unwrap();
let mut socket = soc.connect(addr).await.unwrap();
socket.write_all(&data).await.unwrap();
socket.read_exact(&mut buf).await.unwrap();
assert_eq!(buf[..], data[..]);
// now.elapsed()
tts
}));
}
for handle in handles.drain(..) {
elapsed.push(handle.await.unwrap());
}
elapsed.sort();
let mut i = STEP;
while i <= N {
println!("{}th = {:?}; tts", i, elapsed[i - 1]);
i += STEP;
}
})
.await
.unwrap();
}
[update] better to avoid use of unsafe
, could use fs::File
api
use std::fs::File;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpSocket;
#[tokio::main(worker_threads = 2)]
async fn main() {
let file = File::create("foo.txt").unwrap();
let mut dup_file_list = Vec::new();
// clone file, this calls `dup` underneath which adds file descriptor into the fd_table.
//
// NOTE: I notice fd table will resize to its 2 times big when
// its size is 64, 128, 256, etc. Here we call `dup` 512 times, this
// effectively will add 512 entries to the fd_table and triggers
// `alloc_fdtable` 4 times - which will be sufficient for us since
// it will then have 1024 entries in the fd_table.
for _ in 0..512 {
dup_file_list.push(file.try_clone().unwrap());
}
// once fd_table grows up, drop all files which close each file descriptor
for cloned_file in dup_file_list {
drop(cloned_file);
}
drop(file);
...
}