cubed
cubed copied to clipboard
In-memory rechunk
On some systems there are scenarios where we know we can perform a rechunk entirely in-memory without writing to disk. For example if I locally run this test which performs a single rechunk stage:
spec = cubed.Spec(work_dir="tmp", allowed_mem="400MB")
# 80MB chunks
a = cubed.random.random((10000, 10000), chunks=(10000, 1000), spec=spec)
b = a.rechunk((1000, 10000))
b.visualize(optimize_graph=True)
b.compute()
array([[0.49041066, ..., 0.98331677],
...,
[0.17032274, ..., 0.77605032]])
then it writes an intermediate Zarr store to disk (along with the initial and final arrays),
ls tmp/cubed-20240714T181729-c3b4f32b-ccdc-4a8b-8d13-173f3b49e412
array-003.zarr array-004-int.zarr array-004.zarr
even though I know that my laptop has plenty enough RAM to materialize all the 10 80MB chunks needed for this rechunk into memory at once, which would have avoided writing that intermediate array to disk.
This scenario of uneccessarily writing to disk occurs anytime that:
a) there is some way that workers could potentially communicate data that isn't writing to persistent storage,
b) The whole rechunk stage is small enough that we have n_tasks * allowed_mem_per_task < total_system_memory.
(a) will never happen in a serverless cloud context, because AFAIK there is no other possible way that two lambda functions can communicate other than by writing to s3. But this is not true on other systems. HPC nodes can alternatively communicate via their interconnect (see https://github.com/cubed-dev/cubed/issues/467), and threads running on a single machine can potentially communicate via shared memory (see #492 and https://github.com/cubed-dev/cubed/issues/497).
(b) requires an additional piece of information - the total memory available across the entire system. On a HPC cluster this is mem_per_node * n_nodes, and on a single machine this is just system RAM.
Obviously we don't want to try to do an in-memory rechunk every time, so I propose that we add an additional parameter total_system_mem to the Spec, and allow executors the option to use a different rechunking strategy if condition (b) is fulfilled for a particular rechunk stage in the plan. That way we can use optimized in-memory shuffle algorithms for rechunks that don't need to be written to disk, whilst still writing out to disk for the rechunks that are definitely going to be too big to do in-memory.
You might say "but Tom, I thought the whole point of Cubed was to avoid a complicated centralized shuffle?" But I think what I'm proposing is okay, because we have still broken up the shuffle problem into two clearly separate cases, the larger-than-system-memory case and the smaller-than-system-memory case. We can reason about each of these individually, we know which one we will need in advance, we can still control memory usage as the size of the data to be passed around in the shuffle is known, and we don't have a shuffle algorithm that has to awkwardly deal with both cases by spilling some data to disk when it suddenly realises half-way through that it's about to run out of RAM.
cc @ayushnag @applio