distributed
distributed copied to clipboard
Track task-prefix duration variance
Part-1 of https://github.com/dask/distributed/issues/4023: collect the statistics necessary to visualize.
This doesn't actually give use the stats needed for a box plot (25th, 50th, and 75th percentiles), but is probably good enough for a start.
Neat!
So. How intense do we want to get here? We currently have a couple of ways in which we record timing data
- We record it on the
TaskGroup
s, and then aggregate on demand on theTaskPrefix
- We record it on the
TaskPrefix
itself, particularly for durations (see theall_durations
field) which captures both compute time, as well as communication and deserialization time. This isn't dependent on the TaskGroups because they are ephemeral while we need to keep track of duration information long term for scheduling purposes.
Now we're adding a third way, variance-enabled data for compute times. Do we want to roll this into 2 so that we compute variances of transfer times and such? Do we also want to track things like number of bytes processed over time? Alternatively do we want to keep this information on the TaskGroups
and make them somewhat less ephemeral (weakdeque? :) )
The other thing we could do, if you really do want quantiles, would be to bring in crick (cc @jcrist ) but that seems like maybe overkill for this (and I'm not sure what the performance implications would be).
Speaking of which, I'm assuming that this is pretty cheap, but can you provide a sense for how much added cost this adds? We're comparing against an ideal budget of around 200us per task. If this is sub-microsecond then great! If not then we probably need to do some thinking about it (at least until there is some cythonization happening)
This is really cool to see! @TomAugspurger if you want me to handle the viz part once the measuring is in I'd be happy to take that on
If this is sub-microsecond then great!
Barely
In [1]: from distributed.scheduler import TaskPrefix
...:
...: prefix = TaskPrefix("foo")
...: prefix
...:
...: duration = 1.5
...:
...:
In [2]: %timeit prefix._update_duration_variance(duration)
952 ns ± 72.4 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)
For reference, the other things we do here are setting attributes, which is about 1/10th the time.
In [3]: %timeit prefix.duration_average = duration
85.3 ns ± 8.94 ns per loop (mean ± std. dev. of 7 runs, 10000000 loops each)
And inplace addition, which is about 1/6th the time:
In [4]: from distributed.scheduler import TaskGroup
In [5]: group = TaskGroup("foo")
In [7]: %timeit group.duration += duration
138 ns ± 10.5 ns per loop (mean ± std. dev. of 7 runs, 10000000 loops each)
Having help on the vis part would be most welcome.
@TomAugspurger Do you have any thoughts about how you think think this should be visualized ? For reference we are currently tracking/visualizing time per key here: https://github.com/dask/distributed/pull/3933#issuecomment-651272161
IMO, just a vertical line at the top of each bar from the mean to +/- the variance would be fine. (Dunno how hard that would be to do in bokeh)
I think we can do that with Whiskers . Do you want me to that after this PR ?
That'd be great!
Two comments:
- Maybe gaussian isn't the right distribution here. Someone off-list mentioned that maybe something like a powerlaw distribution would be better to model here. Durations tend to be always positive, and also skewed.
- Should we wait for cython before doing this?
Yeah, measuring a couple percentiles is going to be much more informative here.
FWIW I ran the benchmarks on my machine (M1 OSX) and am clocking in on ~320ns. This can be reduced by another 10% if we reduce a few attribute accesses. Cythonization is also on its way so performance is probably no longer a big deal for this?
diff --git a/distributed/scheduler.py b/distributed/scheduler.py
index cc9b3550..15775183 100644
--- a/distributed/scheduler.py
+++ b/distributed/scheduler.py
@@ -880,13 +880,14 @@ class TaskPrefix:
def _update_duration_variance(self, x):
# Welford rolling variance algorithm
# https://www.johndcook.com/blog/standard_deviation/ for background.
- self._count += 1
- if self._count == 1:
+ self._count = count = self._count + 1
+ if count == 1:
m = x
s = 0.0
else:
- m = self._var_m + (x - self._var_m) / self._count
- s = self._var_s + (x - self._var_m) * (x - m)
+ var_m = self._var_m
+ m = var_m + (x - var_m) / count
+ s = self._var_s + (x - var_m) * (x - m)
self._var_m = m
self._var_s = s
I could imagine many places where uncertainties could improve our decisions (bandwidth and byte size measurements, adaptive targets, worker objectives, steal ratios, etc.)