dask-awkward icon indicating copy to clipboard operation
dask-awkward copied to clipboard

Task graph building becomes uncomfortably slow for large physics analyses

Open lgray opened this issue 1 year ago • 27 comments
trafficstars

When the unoptimized task graph size starts to approach a few thousand layers (which is typical in end-stage physics analysis), generating the task graph can take a few seconds. Typical analyses have a O(100) datasets, so this means that a user can easily incur 10 minutes of overhead before their jobs even start (and if the compute time of the full analysis itself is ~20 minutes this is a large slowdown).

I suppose one thing that could be done if we can't squeeze the building time down any further is to parallelize the building with dask.Delayed and collect the graphs back at the client before submitting them, but we should see if we can make the building process more efficient first.

Profile of the task-graph building of two datasets attached using awkward@main, coffea@master. (macOS/arm64) apply_to_fileset.prof.zip

@agoose77 @douglasdavis

lgray avatar Jan 11 '24 15:01 lgray

FYI The code that produced the profile above is here: https://github.com/cmstas/ewkcoffea/blob/coffea2023/analysis/wwz/wwz4l.py#L33

and you can run it yourself by doing: install:

pip install git+https://github.com/scikit-hep/awkward.git@main
pip install git+https://github.com/CoffeaTeam/coffea.git@master
pip install xgboost
pip install mt2
git clone https://github.com/TopEFT/topcoffea.git -b coffea2023
pushd topcoffea && pip install -e . && popd
git clone https://github.com/cmstas/ewkcoffea.git -b coffea2023
pushd ewkcoffea && pip install -e . && popd

running:

cd ewkcoffea/analysis/wwz
source run_wrapper.sh

lgray avatar Jan 11 '24 15:01 lgray

I slack conversations, a few different factors were mentioned, so I would like to be clear here which of them we are talking about.

  • creation of a number of dask-awkward specific layers using our API, for any number of dak arrays
  • conversion of dak arrays to dask.array , which forces optimisation during initial graph creation
  • time to optimise the graph, particularly for the purposes of column selection
  • bundling/serialising the graph to send to the scheduler

I would say that creating multiple objects or converting to dask.array in parallel is possible. Running the graph for the purposes of optimisation might parallelise, but amending the graph probably does not.

martindurant avatar Jan 11 '24 15:01 martindurant

conversion of dak array to dask.array has been removed - it was an artifact of simpler times :-)

lgray avatar Jan 11 '24 15:01 lgray

and in particular the profile above tends to only your first bullet point - optimizing the graph for columns is quite fast (dak.necessary_columns is 10x slower for big graphs but that's another, less important issue).

lgray avatar Jan 11 '24 15:01 lgray

Though if we can get building of the task graph itself faster then the next item on the list will be the speed of optimization.

There's also some related issues of task graphs with ML algorithms in them or large corrections being multiple MB in size and that seems to make dask unhappy.

We could go to using scatter and futures but that's a big leap in programming paradigm for some folks. For sure it is something we can roll out, but it's not what we taught people to do for five years so far :D.

lgray avatar Jan 11 '24 15:01 lgray

large corrections being multiple MB in size and that seems to make dask unhappy

Some computes are just hard, it's ok is dask complains so long as it - eventually - works. We need to set reasonable expectations.

martindurant avatar Jan 11 '24 15:01 martindurant

Yeah that I can be happy with for now, there are other bigger issues to tackle in terms of UX.

lgray avatar Jan 11 '24 16:01 lgray

In particular the multiple 10s of megabytes is just metadata so we could scatter it to the cluster. It's more a matter of showing people how to do it in their analyses.

lgray avatar Jan 11 '24 18:01 lgray

You don't need scatter() for that, you just need to have that multi-MB value be a dask key in the graph as opposed to a literal value. So wrapping it in ak.from_awkward (1 partition) would be enough, I think .

martindurant avatar Jan 11 '24 18:01 martindurant

huh, interesting - I'll try that!

lgray avatar Jan 11 '24 18:01 lgray

Hmm - the first encounter with that doesn't work out so well since the metadata is an object (a functor), and awkward array does not like that.

lgray avatar Jan 11 '24 18:01 lgray

Maybe as any delayed(), then (this works on objects, not only functions).

martindurant avatar Jan 11 '24 18:01 martindurant

At present that yields: dask_awkward.utils.DaskAwkwardNotImplemented: Use of Array with other Dask collections is currently unsupported.

lgray avatar Jan 11 '24 18:01 lgray

@douglasdavis , any ideas? ;)

martindurant avatar Jan 11 '24 18:01 martindurant

~~I have found a dirty dirty hack.~~ No it doesn't quite work all the way through. I was almost able to hide it in a dask_awkward.Scalar.

lgray avatar Jan 11 '24 18:01 lgray

Hope springs anew - if I can hide it in an attrs of an awkward array I think I can get it done. However, this exposed a bug!

lgray avatar Jan 11 '24 19:01 lgray

We can definitely make delayed objects work nicely with binary/map_parititions dak.Array operations. that exception is pretty strict and there's room for relaxing it a bit

douglasdavis avatar Jan 11 '24 19:01 douglasdavis

Being able to pass in a single delayed object as an argument would be perfect in my case.

lgray avatar Jan 11 '24 19:01 lgray

Anyway this particular direction is vastly beside the major source of slow down, which appears to be awkward operations on zero length arrays and typetracers. I understand those take time but seems to add up quickly!

lgray avatar Jan 11 '24 20:01 lgray

awkward operations on zero length arrays and typetracers

Whether or not they are slow is debatable, but we have MANY of them, so it adds up

martindurant avatar Jan 11 '24 20:01 martindurant

If there's performance to squeeze out, we should find it! Telling everyone to make their analyses simpler won't go over very well. :S

lgray avatar Jan 11 '24 20:01 lgray

When I wrote that exception (some time ago), the problem with mixing dask-awkward collection objects with "anything else" was metadata determination. We always want to be able to keep around an awkward typetracer. If we are mixing a dask-awkward collection object with, for example, a "black box" Delayed object:

@delayed
def a_delayed_array():
    return ak.Array([1, 2])

array = ak.Array([[1, 2, 3], [4], [5, 6, 7], [8]])
dak_array = dak.from_awkward(array, npartitions=2)
result = dak_array * a_delayed_array()

We're going to get an exception trying to figure out what result._meta should be, because a_delayed_array() does not have a metadata that can be used with dak_array._meta

In this case we know result._meta should be the same as dak_array._meta, so we can be explicit with:

result = map_partitions(operator.mul, dak_array, a_delayed_array(), meta=dak_array._meta, output_divisions=1)

But this is of course a bit heavy handed and we don't like suggesting people use map_partitions in cases that are not at least quite nontrivial.

Right now that last code snippet won't work because of the exception mention in @lgray's https://github.com/dask-contrib/dask-awkward/issues/446#issuecomment-1887750609 - we can remove the exception to allow that flavor of map_partitions call at least, but the simple actually use * case will require more thought.

douglasdavis avatar Jan 11 '24 20:01 douglasdavis

Yeah I just need it internal to coffea for a map_partitions call where I can pre-calculate the meta by hand quite easily.

I think you can even leave in that exception if someone doesn't supply a meta by hand.

lgray avatar Jan 11 '24 20:01 lgray

Alright that seems like a good starting point for a PR

douglasdavis avatar Jan 11 '24 20:01 douglasdavis

I was almost able to get this to work by hiding the correction I need to apply in the attrs of a throw-away awkward array, but the partitioning bites me in the ass and it won't compute for large datasets.

So a PR is definitely need so we can have a single delayed object get sent around and applied to multiple calls.

lgray avatar Jan 12 '24 01:01 lgray

So close!

lgray avatar Jan 12 '24 01:01 lgray

Alright https://github.com/dask-contrib/dask-awkward/pull/449 allows map_partitions calls (where meta= is explicitly passed by the caller) to have Delayed objects in the arguments.

douglasdavis avatar Jan 12 '24 04:01 douglasdavis