dask-awkward
dask-awkward copied to clipboard
Performance issue with `repartition`
I was testing a workflow of file skimming, and to account for the possibility that the rate that events of interest is very low in the skimming scheme, I attempted to use array.repartition to reduce the number of files that would be generated, as all file writing methods that I know of creates 1 file per partition.
I've provided a code to generate a set of dummy data that roughly matches the data schema (jagged arrays with very mismatch collection sizes), and performing a simple skim operation. What is observed is that during the is repartition is specified, the memory is pinned at ~5-7GB regardless of the partitioning scheme that is used defined by uproot
. A suggestion to use dask.array.persist
makes the computation of the array.repartition
step takes a very long time and just as much memory.
This is how I am attempting to skim the files in question:
import uproot
import dask
from dask.distributed import Client
client = Client(processes=False, n_workers=1, threads_per_worker=1)
def make_skimmed_events(events):
# Place your selection logic here
skimmed = events[events.nJets > 10]
skimmed["myCustom"] = 137 * 9.8
return skimmed
events = uproot.dask({f"dummy_{idx}.root": "Events" for idx in range(0, 2)}, step_size=2_000) # This only helps save memory if no repartition is used.
print("Calculate skimm")
skimmed = make_skimmed_events(events)
# Trying persist
print("Calculating persisted")
persisted = skimmed.persist()
print("Calculating repartition")
parted = persisted.repartition(rows_per_partition=10_000)
# Or trying direct repartition doesn't work
# parted = skimmed.repartition(rows_per_partition=5_000)
print("Calculating running uproot write")
writer = uproot.dask_write(
parted,
destination="skimtest/",
prefix="mytest/skimmed",
compute=False,
)
print("Calculating the graphs")
dask.visualize(writer, filename="Skim_test_puredask.pdf")
dask.visualize(writer, filename="Skim_test_opt_puredask.pdf", optimize_graph=True)
print("Executing the final task")
dask.compute(writer, optimize_graph=False)
The data in question can be generated using this script (each file will be about 2.5GB in size)
import awkward as ak
import numpy as np
import uproot
for name_idx in range(0, 10):
n_events = 100_000
n_jets = np.random.poisson(lam=5, size=n_events)
n_part = np.random.poisson(lam=300, size=n_events)
n_obj = np.random.poisson(lam=75, size=n_events)
jets_arr = ak.zip(
{
field: ak.unflatten(np.random.random(size=ak.sum(n_jets)), n_jets)
for field in ["a", "b", "c", "d", "e", "f", "g", "j", "i"]
}
)
part_arr = ak.zip(
{
field: ak.unflatten(np.random.random(size=ak.sum(n_part)), n_part)
for field in ["a", "b", "c", "d", "e", "f", "g", "j", "i"]
}
)
obj_arr = ak.zip(
{
field: ak.unflatten(np.random.random(size=ak.sum(n_obj)), n_obj)
for field in ["a", "b", "c", "d", "e", "f", "g", "j", "i"]
}
)
with uproot.recreate(f"dummy_{name_idx}.root") as f:
f["Events"] = {
"Jets": jets_arr,
"Particles": part_arr,
"Object": obj_arr,
}