chore: improve aggregator event queue efficiency
- Implement queue drain pressure that is based on channel's capacity. Pressure usually leads to the system reaching equilibrium without piling the events up.
- Allow configuring channel's target fill capacity, before flushing.
- Lower event buffer capacity, because the current value let the buffer grow into gigabytes in size.
Details
This reverts https://github.com/tokio-rs/console/pull/235, because I found that limit being unreasonably high - adding the following code would lead to tens of gigabytes of size events being piled up to the event queue:
let spam = tokio::spawn(async {
loop {
let _ = tokio::spawn(async {
futures::future::join_all((0..100).map(|_| {
tokio::spawn(async {
let now = tokio::time::Instant::now();
while now.elapsed() < tokio::time::Duration::from_secs(1) {
tokio::task::yield_now().await;
}
})
}))
.await;
})
.await;
}
});
The problem I found in testing is that the aggregator task would consistently fall behind the number of events being created and eventually the event queue would fill up, regardless of its size (increasing the size merely delays the problem). The reason why it fills up is rather subtle recv().now_or_never() eventually returns Poll::Pending due to task coop budget being used up. My immediate fix was to wrap the future with tokio::task::unconstrained, but I figured there's a reason why it's not doing try_recv() and that is to avoid livelocking the task.
What this PR does is expose the "fullness" level of the queue to the shared vars. This allows the aggregator task to have a secondary timer, alongside regular ticking, that adds pressure to clear out the event queue at the same rate as it's filling up.
Another important thing to note is that event queue filling up faster than it's being processed will lead to time dilation - the rate of events in the console would appear slower than it actually is. Adding pressure eventually leads to an equilibrium where events are processed at roughly equal rate.
One key change was adding a customizable variable event_buffer_flush_fraction - the higher the fraction the sooner this equilibrium will be reached. I set it to 10 for now (before the effective value was 2), which, with the reduction of event buffer capacity, should lead to 50x+ faster reaction by the aggregator task, while keeping 9x extra space for random spikes. Note that this variable may be changed to something slightly easier to understand, like event_buffer_flush_percentage, but that's just a user-facing detail I have no strong opinion about.
I believe this should fix https://github.com/tokio-rs/console/issues/184, but I haven't done an actual long-term test in production, will see in a couple days. EDIT: it doesn't seem to fix it
Thank you for working on this! The current queue sizes and other parameters were chosen pretty arbitrarily in the interest of "just getting something working", so I had been hoping to improve the queue performance for some time. I'm excited to give this PR a review!
@hawkw I have pushed requested changes (can squash it later, but I think it's easier to see changes during review)
@h33p Just letting you know that we haven't forgotten this PR!
I've got a test case that I'd like to run your changes against to compare to the current behaviour. It pretty easily produces a situation where the channel between the subscriber and the aggregator grows forever (at least on my local computer using a single thread).