dask-awkward icon indicating copy to clipboard operation
dask-awkward copied to clipboard

Performance issue with `repartition`

Open yimuchen opened this issue 8 months ago • 27 comments

I was testing a workflow of file skimming, and to account for the possibility that the rate that events of interest is very low in the skimming scheme, I attempted to use array.repartition to reduce the number of files that would be generated, as all file writing methods that I know of creates 1 file per partition.

I've provided a code to generate a set of dummy data that roughly matches the data schema (jagged arrays with very mismatch collection sizes), and performing a simple skim operation. What is observed is that during the is repartition is specified, the memory is pinned at ~5-7GB regardless of the partitioning scheme that is used defined by uproot. A suggestion to use dask.array.persist makes the computation of the array.repartition step takes a very long time and just as much memory.

This is how I am attempting to skim the files in question:

import uproot
import dask
from dask.distributed import Client

client = Client(processes=False, n_workers=1, threads_per_worker=1)


def make_skimmed_events(events):
    # Place your selection logic here
    skimmed = events[events.nJets > 10]
    skimmed["myCustom"] = 137 * 9.8
    return skimmed


events = uproot.dask({f"dummy_{idx}.root": "Events" for idx in range(0, 2)}, step_size=2_000) # This only helps save memory if no repartition is used.

print("Calculate skimm")
skimmed = make_skimmed_events(events)

# Trying persist
print("Calculating persisted")
persisted = skimmed.persist()
print("Calculating repartition")
parted = persisted.repartition(rows_per_partition=10_000)

# Or trying direct repartition doesn't work
# parted = skimmed.repartition(rows_per_partition=5_000)
print("Calculating running uproot write")
writer = uproot.dask_write(
    parted,
    destination="skimtest/",
    prefix="mytest/skimmed",
    compute=False,
)
print("Calculating the graphs")
dask.visualize(writer, filename="Skim_test_puredask.pdf")
dask.visualize(writer, filename="Skim_test_opt_puredask.pdf", optimize_graph=True)
print("Executing the final task")
dask.compute(writer, optimize_graph=False)

The data in question can be generated using this script (each file will be about 2.5GB in size)

import awkward as ak
import numpy as np
import uproot


for name_idx in range(0, 10):
    n_events = 100_000

    n_jets = np.random.poisson(lam=5, size=n_events)
    n_part = np.random.poisson(lam=300, size=n_events)
    n_obj = np.random.poisson(lam=75, size=n_events)

    jets_arr = ak.zip(
        {
            field: ak.unflatten(np.random.random(size=ak.sum(n_jets)), n_jets)
            for field in ["a", "b", "c", "d", "e", "f", "g", "j", "i"]
        }
    )
    part_arr = ak.zip(
        {
            field: ak.unflatten(np.random.random(size=ak.sum(n_part)), n_part)
            for field in ["a", "b", "c", "d", "e", "f", "g", "j", "i"]
        }
    )
    obj_arr = ak.zip(
        {
            field: ak.unflatten(np.random.random(size=ak.sum(n_obj)), n_obj)
            for field in ["a", "b", "c", "d", "e", "f", "g", "j", "i"]
        }
    )

    with uproot.recreate(f"dummy_{name_idx}.root") as f:
        f["Events"] = {
            "Jets": jets_arr,
            "Particles": part_arr,
            "Object": obj_arr,
        }

yimuchen avatar May 31 '24 16:05 yimuchen