ngo
ngo copied to clipboard
[RFC] Timeout support
Motivation
It is common to have event waiting logics bounded with timeouts. But currently, all NGO code will wait forever if no interesting events arrive. And since there are so many system calls that may wait with timeout, we need to figure out a general mechanism to support timeout.
Background
Currently, all event wait/wakeup code in NGO eventually relies on Waiter
/WaiterQueue
provided by the async-rt
crate. And a convenient macro named waiter_loop
is provided to make it even easier for the most common pattern when using Waiter
/WaiterQueue
, which is to 1) try to make progress with a task, 2) if not ready, wait for notifications before trying again.
Here are some examples.
Exit/Wait4
The exit and wait4 system calls use Waiter
and WaiterQueue
.
// File: libos/process/do_wait4.rs
pub async fn do_wait4(child_filter: &ProcessFilter) -> Result<(pid_t, i32)> {
let thread = current!();
let process = thread.process();
waiter_loop!(process.exit_waiters(), {
// Lock order: always lock parent then child to avoid deadlock
let mut process_inner = process.inner();
let unwaited_children = /* get all possible children */
if unwaited_children.len() == 0 {
return_errno!(ECHILD, "Cannot find any unwaited children");
}
// Return immediately if a child that we wait for has already exited
let zombie_child = unwaited_children
.iter()
.find(|child| child.status() == ProcessStatus::Zombie);
if let Some(zombie_child) = zombie_child {
let zombie_pid = zombie_child.pid();
let exit_status = free_zombie_child(&process, process_inner, zombie_pid);
return Ok((zombie_pid, exit_status));
}
}
}
// File: libos/process/do_exit.rs
fn exit_process(thread: &ThreadRef, term_status: TermStatus) {
// ...
// Notify the parent that this child process's status has changed
parent.exit_waiters().wake_all();
// ...
}
Sigtimedwait
The sigtimedwait also uses Waiter
and WaiterQueue
to wait for signals.
pub async fn do_sigtimedwait(interest: SigSet, timeout: Option<&Duration>) -> Result<siginfo_t> {
if let Some(timeout) = timeout {
warn!("do not support timeout yet");
}
let thread = current!();
let process = thread.process().clone();
// Interesting, blocked signals
let interest = {
let blocked = thread.sig_mask().read().unwrap();
*blocked & interest
};
// Loop until we find a pending signal or reach timeout
waiter_loop!(process.sig_waiters(), {
if let Some(signal) = dequeue_pending_signal(&interest, &thread, &process) {
let siginfo = signal.to_info();
return Ok(siginfo);
}
});
}
Futex
Futexes are also backed by Waiter
and WaiterQueue
.
pub async fn futex_wait_bitset(
futex_addr: *const i32,
futex_val: i32,
timeout: &Option<Duration>,
bitset: u32,
) -> Result<()> {
let waiter = Waiter::new();
let futex_item = FutexItem::new(futex_key, bitset, waiter.waker());
futex_bucket.enqueue_item(futex_item.clone());
// Must make sure that no locks are holded by this thread before wait
drop(futex_bucket);
waiter.wait(/*timeout.as_ref()*/).await;
Ok(())
}
I/O notifications
I/O notifications, due to its importance and ubiquitousness, are implemented with three new abstractions: Poller
, Pollee
, and Events
.
Here is a simplified code from StreamSocket
.
impl<A: Addr + 'static, R: Runtime> ConnectedStream<A, R> {
pub async fn readv(self: &Arc<Self>, bufs: &mut [&mut [u8]]) -> Result<usize> {
let total_len: usize = bufs.iter().map(|buf| buf.len()).sum();
if total_len == 0 {
return Ok(0);
}
// Initialize the poller only when needed
let mut poller = None;
loop {
// Attempt to reade
let res = self.try_readv(bufs);
if !res.has_errno(EAGAIN) {
return res;
}
// Wait for interesting events by polling
if poller.is_none() {
poller = Some(Poller::new());
}
let mask = Events::IN;
let events = self.common.pollee().poll_by(mask, poller.as_mut());
if events.is_empty() {
poller.as_ref().unwrap().wait().await;
}
}
}
}
But behind the scense, Poller
and Pollee
are also implemented with Waiter
and WaiterQueue
.
Proposed solution
The new interface
From the examples above, we can see that to support timeouts, all we have to do is to add the support of timeout to the Waiter::wait
and Poller::wait
methods. And since the latter one is based on Waiter
, the problem is now boiled down to support timeout for the Waiter::wait
method.
/// File: crates/async-rt/wait/Waiter.rs
impl Waiter {
/// The new version of wait method, with a timeout argument added.
///
/// Awaiting the returned future will block until the waiter is woken up or
/// the timeout expires (if the timeout is `Some(_)`).
///
/// The output type of the Future is `Result<()>`. If the waiter is woken up,
/// then the result is `Ok`. Otherwise, the result is `Err(ETIMEOUT)`.
///
/// If the timeout is `Some(_)`, then the contained `Duration` will be updated
/// to reflect the remaining time when the method returns.
pub fn wait(&self, timeout: Option<&mut Duration>) -> WaitFuture<'__> {
self.wait(timeout)
}
}
impl WaiterInner {
pub fn wait(&self, timeout: Option<&mut Duration>) -> WaitFuture<'_> {
WaitFuture::new(self, timeout)
}
}
pub struct WaitFuture<'a> {
waiter: &'a WaiterInner,
timeout: Option<&'a mut Duration>,
}
impl<'a> Future for WaitFuture<'a> {
type Output = Result<()>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
todo!("let's discuss the implementation detail later")
}
}
Since the waiter_loop
macro does not take an argument of timeout, we can either extend the macro with an extra argument or create a new macro named waiter_loop_with_timeout
(any better name?). Either way, the timeout
-supporting version macro should return a Result<()>
for the loop, thus the user can capture and handle errors due to timeout.
The refined interface
It is quite common to invoke Waiter::wait
in a loop. But due to Rust's ownership and move semantics, the following code is not valid.
fn imagined_blocking_syscall(timeout: Option<&mut Duration>) {
let waiter = Waiter::new();
loop {
waiter.wait(timeout);
}
}
An error of "use of moved value" will be reported by the Rust compiler.
To workaround this problem, one solution is to redefine the signature of the wait
method.
impl Waiter {
pub fn wait<T>(&self, timeout: Option<&mut T>) -> WaitFuture<'__>
where T: BorrowMut<Duration>
{
self.wait(timeout)
}
}
This way, the wait
method can accept both Option<&mut Duration>
and Option<&mut &mut Duration>
as its argument. Thus, we can rewrite the loop as the following.
fn imagined_blocking_syscall(timeout: Option<&mut Duration>) {
let waiter = Waiter::new();
loop {
waiter.wait(timeout.as_mut());
}
}
Changes to the wait
method
Now let's try to figure out the implementationn of the wait
method. Assume that we have a decorator future Timeout
that can wrap any future to make a new future that completes until the internal future completes or the timeout expires.
With this new convenient primitive of Timeout
, we can now rewrite the Waiter::wait
method as the following.
impl Waiter {
pub fn wait<T>(&self, timeout: Option<&mut T>) -> Timeout<WaitFuture<'_>>
where T: BorrowMut<Duration>
{
Timeout::new(self.wait(), timeout)
}
}
Make WaitFuture
cancel-safe
Cancelling Rust futures is still an open problem. Simply dropping a future before its completion may even introduce memory safety issues. Currently, there is no good-enough, language-level solution. See this article for more info.
Luckily, in this proposal, we only need to cancel a very specific future---WaitFuture
. The original version of WaitFuture
is written on the assumption that the future always run to completion.
impl<'a> Future for WaitFuture<'a> {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut raw_waker = self.waiter.raw_waker.lock();
match self.waiter.state() {
WaiterState::Idle => {
self.waiter.set_state(WaiterState::Waiting);
*raw_waker = Some(cx.waker().clone());
Poll::Pending
}
WaiterState::Waiting => {
*raw_waker = Some(cx.waker().clone());
Poll::Pending
}
WaiterState::Woken => {
debug_assert!(raw_waker.is_none());
Poll::Ready(())
}
}
}
}
Simply droping a WaitFuture
may result in a Waiter
/ WaiterInner
in an unexpected state. To fixe this issue, we can implement a Drop
trait for WaitFuture
to ensure that Waiter
/ WaiterInner
's state is good even when WaitFuture
is cancelled.
impl<'a> Drop for WaiterFuture<'a> {
fn drop(&mut self) {
let mut raw_waker = self.waiter.raw_waker.lock();
if let WaiterState::Waiting = self.waiter.state() {
*raw_waker = None;
self.waiter.set_state(WaiterState::Idle);
}
}
}
Implement Timeout<F: Future>
Needs to be added :)
Implement Timeout<F: Future>
Timeout is a decorator future that can wrap any future to make a new future that completes until the internal future completes or the timeout expires.
Timeout have two fields:
- future: the future that warpped.
- timer: the timer, will expire in certain duration.
Notes that, Timeout should be pinned. We can use pin_project crate to implement it easily.
pin_project! {
pub struct Timeout<F> {
#[pin]
future: F,
#[pin]
timer: Timer,
}
}
impl<F> Timeout<F> {
pub fn new(future: F, timer: Timer) -> Timeout<T> {
Timeout { future, timer}
}
}
Since Timeout is a Future, we need impl Future
trait:
impl<F: Future> Future for Timeout<F>
{
type Output = Result<F::Output>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
if let Poll::Ready(output) = this.future.poll(cx) {
return Poll::Ready(Ok(output));
}
match this.timer.poll(cx) {
Poll::Ready(_) => Poll::Ready(Err("The timer expired")),
Poll::Pending => Poll::Pending
}
}
}
If the timer
expired before its future
, Timeout will be waked up to poll()
again. In this poll()
, If the future
is still not ready, Timeout will return Ready(Err(()))
Integrate timeout support for async_rt crate
To support timeout fully in async_rt crate, we need:
- Time Expression:
no_std
. e.g.,Instant
,Duration
.Instant::now()
is supported through the interface of Time Acquisition. - Time Acquisition: Get time through Linux vDSO. If faild, fallback to OCALL.
- Timer:
no_std
. See below for details. - Timeout:
no_std
. See above for details. - Waiter:
no_std
. We already have.
Users use waiter
interface. The waiter
is implemented by Timeout
, and the Timeout
is implemented by Timer
.
Time Expression + Time Acquisition + Timer = The underlying timeout implementation.
Time Expression
- Solution 1: We can use time crate. Althrough this crate support
no_std
,Instant
struct needsstd
. We need do some modification to adapt ourvdso-time
crate. - Solution 2: We implement it ourselves, since we don't need much functionality.
Time Acquisition
We already have vdso-time
crate, we just need to integrate fallback to vdso-time
crate.
Timer
: The key to implementing Timeout
Each timer has a duration or instant, when the timer expired, we need be notified and trigger some things (optional).
Timer model
According to the Hashed and Hierarchical Timing Wheels paper, The timers model is composed of two user-facing operations, start and stop, and two internal operations: per-tick bookkeeping and expiry processing.
- start: start a timer.
- stop: stop and remove a timer.
- per-tick bookkeeping: happens on every tick of the timer clock. maintains all timer states.
- expiry processing: the timer is expired and trigger some things.
To implement timer efficiently, we consider following schemes:
scheme 1: io-uring
When start a timer, we send a timeout request to io-uring submission queue. When the request is completed, we will be notified and we can trigger some things.
Drawback: Two much pressure for io-uring. The latency can be high.
scheme 2: unordered Timer List
Keep an unordered list of timers and track the remaining time for each. Per-tick bookkeeping will maintain all timers' remaining time.
start is O(1), stop is O(1), per-tick bookkeeping is O(n).
scheme 3: ordered Timer List
Keep a list of timers as in scheme 2, but record the absolute expiry time (not remaining time) and keep the timer list ordered by this expiry time (with the timers closest to expiry at the head of the list).
start is O(n), stop is O(1), per-tick bookkeeping is O(1).
scheme 4: Timer tree
similar to scheme 3, but use priority queue or ordered tree.
start is O(log(n)), stop is O(log(n)), per-tick bookkeeping is O(1).
scheme 5: Simple Timing Wheels
When all timers have a maximum period of no more than MaxInterval, we can construct a circular buffer with MaxInterval slots (each representing one tick). The current time is represented by an index into the buffer. To insert a timer that expires j time units in the future, we move j slots around the ring and add the timer to a list of timers held in that slot. Every tick, the current time index moves one slot around the ring and does expiry processing on all timers held in the new slot.
Start, stop, and per-tick bookkeeping are all O(1).
scheme 6: Hashing Wheel with Ordered Timer Lists
If MaxInterval is comparatively large, simple timing wheels can use a lot of memory. Instead of using one slot per time unit, we could use a form of hashing instead. Construct a circular buffer with a fixed number of slots and have the current time index advance one position in the ring on a tick as before. To insert a timer that expires j time units in the future, compute a slot delta s = j % num-buckets . Insert the timer s slots around the ring with its time of expiry. Since there may be many timers in any given slot, we maintain an ordered list of timers for each slot.
start / stop: average O(1), worst O(n). Per-tick bookkeeping is O(1).
scheme 7: Hashing Wheel with Unordered Timer Lists
This is a variant on scheme 6 where instead of storing absolute time of expiry we store a count of how many times around the ring each timer is in the future. To insert a timer that expires j time units in the future, compute a counter value c = j / num-buckets and a slot delta s = j % num-buckets. Insert the timer s slots around the ring with its counter value c. Keep the timers in an unordered list in each slot.
start / stop: O(1). Per-tick bookkeeping: average O(1), worst O(n).
scheme 8: Hierarchical Timing Wheels
Another way to deal with the memory issues caused by the simple timing wheel approach is to use multiple timing wheels in a hierarchy. Suppose we want to store timers with second granularity, that can be set for up to 100 days in the future. We might construct four wheels:
- A days wheel with 100 slots
- An hours wheel with 24 slots
- A minutes wheel with 60 slots
- A seconds wheel with 60 slots
This is a total of 244 slots to address a total of 8.64 million possible timer values. Every time we make one complete revolution in a wheel, we advance the next biggest wheel by one slot (the paper describes a slight variation with minute, hour, and day ticking clocks, but the effect is the same). For example, when the seconds wheel has rotated back to index ‘0’ we move the index pointer in the minutes wheel round by one position. We then take all the timers in that slot on the minutes wheel (which are now due to expire within the next 60 seconds) and insert them into their correct positions in the seconds wheel. Expiry time processing in the seconds wheel works exactly as described in scheme 4 (it’s just a simple timer wheel that happens to get replenished on every revolution). To insert a timer, find the first wheel (from largest units to smallest) for which the timer should expire 1 or more wheel-units into the future. For example, a timer due to expire 11 hours, 15 minutes and 15 seconds into the future would be inserted at slot ‘current-index + 11’ in the hours wheel , storing the remainder of 15 minutes and 15 seconds with the timer. After the hours wheel has advanced by 11 positions, this timer will be removed from that wheel and inserted at ‘current index + 15’ slots round in the minutes wheel, storing the remainder of 15 seconds. When the minutes wheel has subsequently advanced by 15 positions, this timer will be removed from the wheel and placed in the seconds wheel at ‘current index + 15’ slots round in the seconds wheel. 15 seconds later, the timer will expire!
Prefer scheme: Hierarchical Timing Wheels
Timer Wheels:
multi-level, e.g. six level.
- 1ms wheel, 64 slots (0 - 63ms)
- 64ms wheel, 64 slots (64ms - 64*64ms)
- 64*64ms wheel, 64 slots.
- 64*64*64ms wheel, 64 slots.
- 64*64*64*64ms wheel, 64 slots.
- 64*64*64*64*64ms wheel, 64 slots.
Each level has a current index. All levels share a latest absolute time (The time of latest per-tick bookkeeping).
start a timer
Store the expiry duration to the timer, and insert the timer to the corresbonding slot. Since we know the time of latest per-tick bookkeeping, the current index and the expiry duration, we can find the slot without get real time.
per-tick bookkeeping
Get the real time, update the current index and the latest absolute time. If meet a slot with timers during moving index, move these timer to lower level. If this slot is in the lowest level, stop these timers and do expiry processing.
stop and expiry processing
wake up the timer's waker and remove the timer.
When to tick ?
The wheels should be guarded by lock. Before each schedule, we try to get the lock. If we get the lock, then tick once, else we just do scheduling.
Optimization to tick ?
Maybe there are no timer near the time in the wheels, in this case, the tick is not efficient, since each tick need get real time through vDSO or OCALL.
We can have a flag named has_near_timer
. If the flag is true, we need tick before scheduling. If the flag is false, we don't tick.
In default, the flag is false.
When start a timer, check the expiry duration.
- If the timer belong to the lowest level, set
has_near_timer
to true. - If
has_near_timer
is false and the timer isn't belong to lowest level, use io-uring to send a timeout request. When this request completed, sethas_near_timer
to true.
Reference: paper reading blog: Hashed and Hierarchical Timing Wheels paper: Hashed and Hierarchical Timing Wheels kafka's hierarchical timing wheels tokio's timer
The timeout design looks good. I didn't dive into the Hierarchical Timing Wheels scheme. But having a design backed by a paper looks promising to me. And I am hoping that there is already a Rust implementation for the algorithm. If not, I think we can implement a simplified, good enough version for now and leave the complete implementation in the future.
Could you break down the design into actionable tasks?
actionable tasks
-
[x] time expression. Althrough there are no_std time crate, e.g.,
time
, theInstant
struct is dependent to std. If we want to integrate ourvdso-time
crate, we need to modify these crate. I think the better way is to implement it ourselves. We only need to implementInstant
struct and integrate it withvdso-time
crate. It's almost done. -
[x] time acquisition. We already have
vdso-time
crate, all we need to do is to support fallback invdso-time
crate. -
[x] timer wheels. related crates:
- wheel_timer: Simple hashed wheel timer with bounded interval. Hashed timer only, too simple.
- pendulum-rs: Data structures and runtimes for timer management. Hashed timer only, more mature.
- ferris: A hierarchical timer wheel in Rust. Too simple.
- rust-hash-wheel-timer: a low-level event timer implementation based on hierarchical hash wheels. More mature.
- tokio/time/driver/wheel: timer wheel part of tokio crate. hierarchical timer wheel, complexity, can not use directly.
We can implement a simplified version for now.
-
[x] integrate timer wheels to scheduler.
-
[x] timeout.
-
[x] new interface of waiter.