zarr-python icon indicating copy to clipboard operation
zarr-python copied to clipboard

Coalesce and parallelize partial shard reads

Open aldenks opened this issue 8 months ago • 23 comments

This PR optimizes reading more than one, but not all, chunks from a shard. Towards #1758.

  • Combines requests for chunks which are nearby within a shard into a single call to the byte_getter. There's a configurable maximum gap between requests and maximum coalesced total request size -- if either of those are exceeded a new group is made.
  • The coalesced groups are requested with concurrent_map.

To be explicit, in my mind the optimizations in this have two goals:

  1. Speed up the time it takes to read many chunks from a single shard. Most of the benefit on this point comes from using concurrent_map but with especially small chunks coalescing can also help reduce request overhead.
  2. Reduce the total count of calls to the byte_getter. If the store (e.g. object storage) has a per operation charge this results in reduced costs.

Mostly these goals aren't in conflict. The maximum coalesced size option provides a knob to ensure we aren't coalescing requests into a single very large request which would be faster to make as multiple concurrent calls.

TODO:

  • [x] Add unit tests and/or doctests in docstrings
  • [x] Add docstrings and API docs for any new/modified user-facing classes and functions
  • [x] New/modified features documented in docs/user-guide/*.rst
  • [x] Changes documented as a new file in changes/
  • [x] GitHub Actions have all passed
  • [x] Test coverage is 100% (Codecov passes)

aldenks avatar Apr 22 '25 04:04 aldenks

I'm curious what folks think of this and looking for input! One particular design question to note: this adds a nested concurrent_map (there's already one in the codec pipeline's read/read_batch) so it's technically possible to have concurrency of async.concurrency**2. That said, I think we want some concurrency here.

aldenks avatar Apr 22 '25 04:04 aldenks

@aldenks thanks for working on this, it's a great optimization direction to pursue.

That being said, since this is a performance optimization, it might be helpful to get some metrics for how much perf we get with these changes, in terms of speed and also latency / number of requests. If reducing requests is the goal, you could consider implementing a WrapperStore as a demonstration or test fixture that tracks the number of times its methods are invoked, and use that to show that your approach reduces requests. But any other demonstration would work too.

As for the specific code changes (and the nested concurrent map), nothing jumps out to me as problematic, but that might just be my ignorance of async python and the sharding code 😁

d-v-b avatar Apr 24 '25 16:04 d-v-b

I'm working on a couple performance tests as Davis suggested above. Trying airspeed velocity which is new to me but looks useful.

aldenks avatar Apr 28 '25 16:04 aldenks

Something like this might be more useful than asv here. A regression test of this form would also be great, if it can be made to work

https://github.com/pydata/xarray/blob/e37bfbf1bff63476de9e9d95eedbbd66d864f588/xarray/tests/test_backends.py#L3415

dcherian avatar Apr 28 '25 16:04 dcherian

Got pulled away after pushing those changes but before describing them -- I'll update this tomorrow with profiling results.

aldenks avatar May 01 '25 02:05 aldenks

alright!

  • Tests added that check for coalesced (reduced count) of calls to store
  • Profiled execution time and store request count

I don't necessarily need a detailed review at this point -- if yall let me know any open questions you have or give a thumbs up ill work on the remaining todos in the description.

Tests

Added test_sharding_multiple_chunks_partial_shard_read which is parameterized to test both with and with out coalescing and I'm seeing the behavior I'd expect. e.g.:

    if coalesce_reads:
        # 2 shard index requests + 2 coalesced chunk data byte ranges (one for each shard)
        assert store_mock.get.call_count == 4
    else:
        # 2 shard index requests + 6 chunks
        assert store_mock.get.call_count == 8

Profiling

I added a slow test which reads a few subsets of a large-ish shard containing small-ish chunks under a range of settings. See test_partial_shard_read_performance for details.

The test is currently skipped and I don't necessarily think it should be a test, probably belongs in bench or I'm happy to remove now it's done its job.

To recreate the figures below:

  • Check out the commit before the optimization changes https://github.com/zarr-developers/zarr-python/pull/3004/commits/91fbdf9299bef95624459e7722259cdaf04f5afe and run hatch env run --env test.py3.12-2.1-optional -- pytest -s tests/test_codecs/test_sharding.py::test_partial_shard_read_performance --run-slow-hypothesis. Then checkout the latest commit https://github.com/zarr-developers/zarr-python/pull/3004/commits/9f6727da283d60cd6187c2876271fa59e76db23b and run the same command again. (yes, this is what airspeed velocity is for but i opted for simpler for now.)
  • Run the plotting code below
Plotting code
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

df_coalesce = pd.read_json(
    "zarr-python-partial-shard-read-performance-with-coalesce.json"
)
df_no_coalesce = pd.read_json(
    "zarr-python-partial-shard-read-performance-no-coalesce.json"
)
df_no_coalesce["coalesce_max_gap"] = "Before optimization"
df = pd.concat([df_no_coalesce, df_coalesce])

# Define statements and metrics
statements = df["statement"].unique()
metrics = ["time", "store_get_calls"]
metric_titles = ["Execution Time", "Number of Store Get Calls"]
metric_ylabels = ["Time (s)", "Number of Calls"]

# Create labels for coalesce values
coalesce_values = df["coalesce_max_gap"].unique()
coalesce_labels = [
    x if isinstance(x, str) else "Disabled" if x == -1 else f"{x/1024/1024:.0f}MiB"
    for x in coalesce_values
]

# Get unique get_latency values
latency_values = df["get_latency"].unique()

# For each unique get_latency value, create a separate figure
for latency in latency_values:
    latency_df = df[df["get_latency"] == latency]

    max_time = latency_df["time"].max()
    max_store_get_calls = latency_df["store_get_calls"].max()

    plt.figure(figsize=(16, 12))
    plt.suptitle(
        "Performance when reading a subset of a shard\n"
        "Execution time and count of store `get` calls for 3 array access patterns\n"
        "Array and shard shape: (512, 512, 512), chunk shape: (64, 64, 64)\n"
        "Store get latency: local ssd"
        + (
            ""
            if latency == 0
            else f" + {latency * 1000:.0f}ms (simulated object storage)"
        ),
        fontsize=14,    )

    # One row per statement, two columns: time and store_get_calls
    for i, statement in enumerate(statements):
        statement_df = latency_df[latency_df["statement"] == statement]

        # Plot for time
        plt.subplot(3, 2, i * 2 + 1)
        sns.barplot(
            data=statement_df,
            x="concurrency",
            y="time",
            hue="coalesce_max_gap",
            hue_order=coalesce_values,
            palette="muted",
            errorbar=None,
        )
        plt.title(f"{statement} - {metric_titles[0]}")
        plt.xlabel("Concurrency")
        plt.ylabel(metric_ylabels[0])
        plt.ylim(0, max_time * 1.1)
        plt.legend(title="Coalesce Max Gap", labels=coalesce_labels)

        # Plot for store_get_calls
        plt.subplot(3, 2, i * 2 + 2)
        sns.barplot(
            data=statement_df,
            x="concurrency",
            y="store_get_calls",
            hue="coalesce_max_gap",
            hue_order=coalesce_values,
            palette="muted",
            errorbar=None,
        )
        plt.title(f"{statement} - {metric_titles[1]}")
        plt.xlabel("Concurrency")
        plt.ylabel(metric_ylabels[1])
        plt.ylim(0, max_store_get_calls * 1.1)
        plt.legend(title="Coalesce Max Gap", labels=coalesce_labels)

    plt.tight_layout()
    plt.savefig(f"zarr-metrics-by-statement-with-latency-{latency}s.png", dpi=300)
    plt.show()
    plt.close()

Local SSD

The results shown in this figure read from my local SSD. Before this optimization and when run on my ssd, the time to read chunks serially in the loop in ShardingCodec._decode_partial_single was about the same as the time to decode chunks (which was parallelized already within the self.codec_pipeline.read call).

zarr-metrics-by-statement-with-latency-0 0s

Simulated Object Storage

This optimization is most important reading from higher latency storage. To simulate this I added 10ms of latency to each get call. (I read that with specific networking setups you can get down to 1ms latency from S3, I also regularly see >100ms in practice, so 10ms feels fair)

zarr-metrics-by-statement-with-latency-0 01s

Takeaways

None of these are particularly surprising, but my takeaways are:

  • Most of the execution time benefit comes from adding concurrency
  • We can reduce store request count by enabling coalescing and get equal or better performance to adding concurrency alone
  • If we coalesce over gaps that are too large execution time increases a bit as we're requesting more data we don't need. 1MiB seems like a decent middle ground as a maximum gap between chunks to coalesce.

aldenks avatar May 01 '25 15:05 aldenks

This is very cool!

On the topic of benchmarking: When reading compressed & sharded datasets, LDeakin's zarr_benchmarks suggests that zarr-python (without this PR) is quite slow. Note that the size of the performance difference isn't immediately obvious from a quick glance at the zarr_benchmark graphs, until you read the numbers above each bar in the bar charts! For example, in the "round trip benchmark", zarrs takes 3 seconds to read a compressed and sharded dataset from a local SSD, whilst zarr-python takes 181 seconds!

Hopefully this PR will help zarr-python to catch up. Thank you, Alden!

If it's helpful, and if this PR is ready, then I could try running LDeakin's zarr_benchmarks with this PR, if you'd like?

JackKelly avatar May 02 '25 14:05 JackKelly

Thanks @JackKelly! I'd be happy for more benchmarking of this. I took a look at LDeakin's zarr_benchmarks and I would expect the changes in the PR to not have any impact. All the read tests in there appear to read complete shards and this PR doesn't touch that path. If you're adding benchmarks I might advocate for one that reads subsets of multiple shards as I find that's a pretty common read pattern in practice.

aldenks avatar May 05 '25 14:05 aldenks

Noted! I've started making some notes for my plans for benchmarking here. Feel free to comment!

(BTW, please don't let my benchmarks hold up this PR! IMHO, the benchmarks that Alden has presented above are more than sufficient to demonstrate that this PR speeds up zarr-python. The aim with my benchmarking is more to compare zarr-python (with and without this PR) against zarrs).

JackKelly avatar May 06 '25 07:05 JackKelly

(Just a quick note to say that I'm afraid I'm now planning to experiment with storing NWP data in Parquet (using Polars)... so I might not get round to benchmarking Zarr readers soon, I'm sorry...)

JackKelly avatar May 12 '25 18:05 JackKelly

Any maintainers, I don't necessarily need a detailed review at this point -- if yall let me know any open questions you have or give a thumbs up ill work on the remaining todos in the description.

aldenks avatar May 13 '25 13:05 aldenks

I'm approaching having time to wrap up the TODOs in the description and still eager to get this merged

aldenks avatar May 30 '25 19:05 aldenks

Codecov Report

:white_check_mark: All modified and coverable lines are covered by tests. :white_check_mark: Project coverage is 94.56%. Comparing base (926a52f) to head (78313aa).

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #3004      +/-   ##
==========================================
+ Coverage   94.54%   94.56%   +0.02%     
==========================================
  Files          78       78              
  Lines        9423     9459      +36     
==========================================
+ Hits         8909     8945      +36     
  Misses        514      514              
Files with missing lines Coverage Δ
src/zarr/codecs/sharding.py 95.02% <100.00%> (+0.54%) :arrow_up:
src/zarr/core/config.py 83.33% <ø> (ø)
:rocket: New features to boost your workflow:
  • :snowflake: Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

codecov[bot] avatar Jul 21 '25 20:07 codecov[bot]

Ready for review!

at long last

The PR optimizes reading part of a shard in two ways:

  1. Requests for nearby chunks in the same shard are combined into a single request to the store. This combining process respects two config options which define 1) the maximum gap in bytes between two chunks to combine over and 2) the maximum size of a combined group.
  2. The resulting requests to the store are performed concurrently if there are > 1 groups within a shard after coalescing.

Motivation:

  • speed, see profiling in this comment above
  • reduced request count to stores, and therefore operation costs when using most object storage providers

A couple questions to draw attention to:

  • The newly added concurrent_map over groups within a shard is nested within the existing concurrent_map in BatchedCodecPipeline.read_batch. This means one could have concurrency of up to async.concurrency ^ 2. I don't see a clean way to limit concurrency overall and the concurrency is the main source of the speedup. The default concurrency (10) is also pretty conservative imo so the extreme case of squaring it is perhaps ok?
  • The defaults for the config options. Here's my thinking and experience with them.
    • max gap: see the bar plots in the performance table, its a tradeoff. The lower the store latency, the less gap you want and latencies are trending down so 1MiB feels like a fairly conservative default. It also is the value used by rust's object_store in their get_ranges function (and that value isn't configurable) and that's a very well tested in production library.
    • max group size. I added this to avoid the case where coalescing causes you to loose concurrency when reading from a large (~200MB+) shard and that lower concurrency reduces throughput. I didn't tune this value much, but from my experience and what I read online 100MiB is likely large enough to amorize out most of the per request overhead and get close to the maximum per individual store request throughput.

Minor notes/questions

  • Patch code cov is not 100%, but when i look at the report it looks like its 100% of the functional lines of code
  • Is passing fill_value=0 rather than shard_spec.fill_value to the two cases of nd_buffer.create in the sharding codec intentional? The overall behavior is correct currently because it gets filled later, but feels a bit like a bug in waiting.

aldenks avatar Jul 22 '25 15:07 aldenks

@d-v-b who should I request a review from on this?

aldenks avatar Aug 06 '25 16:08 aldenks

@zarr-developers/python-core-devs who wants to check out this sweet PR

d-v-b avatar Aug 06 '25 17:08 d-v-b

a high level comment -- the tests are all at the array level, which is effectively an integration test, testing how this functionality fits into the entire stack. It would be a lot more direct if we could do lower-level tests, e.g. create a model of a shard, compare the proposed reading plan against the reading plan we expect. This kind of test might not be easy to write in our codebase today, but if so we should identify things we need to change to bring us there.

(not a blocker for this PR if these tests are hard to write)

d-v-b avatar Aug 06 '25 17:08 d-v-b

Thank you for the first review @d-v-b! I didn't get to this before going out of office but ill address your comments when im back in ~1 week.

aldenks avatar Aug 08 '25 17:08 aldenks

this is great! With #3561 now creating sharded arrays is almost as fast as regular chunked arrays; but reads are still painfully slow and does too many requests even for sequential reads in the slow direction. Any idea if this PR will be pushed forward?

tasansal avatar Nov 21 '25 21:11 tasansal

this is great! With #3561 now creating sharded arrays is almost as fast as regular chunked arrays; but reads are still painfully slow and does too many requests even for sequential reads in the slow direction. Any idea if this PR will be pushed forward?

For example:

I am using this to create data on GCS:

import zarr
from distributed import Client
import dask.array as da

client = Client(n_workers=8, threads_per_worker=2)

path = "gs://bucket/prefix/bench.zarr"
root = zarr.create_group(path, overwrite=True)

# Example: Create a sharded array
sharded = root.create_array(
    name="sharded",
    shape=(1355, 2456, 1501),
    shards=(256, 256, 256), 
    chunks=(32, 32, 32),
    dtype='float32',
)

chunked = root.create_array(
    name="chunked",
    shape=(1355, 2456, 1501),
    chunks=(128, 128, 128),
    dtype='float32',
)

data = da.random.normal(size=sharded.shape, chunks=(256, 256, 256))
%%time
data.to_zarr(chunked)

gives 40.4s

%%time
data.to_zarr(sharded)

gives 64s.

Which is somewhat reasonable.

However, the reads:

%%time
root["chunked"][256]

9.22s

%%time
root["sharded"][256]

17.7s

The same read with tensorstore on the sharded version is significantly faster.

tasansal avatar Nov 21 '25 21:11 tasansal

Here is the tensorstore version:

import tensorstore
import tensorstore as ts

spec = {
    "driver": "zarr3",
    "kvstore": {
        "driver": "gcs",
        "bucket": "bucket",
        "path": "prefix/bench.zarr/sharded"
    }
}

# Open the Zarr array (async, use .result() for blocking)
array = ts.open(spec).result()

then

%%time
array[256].read().result()

results in 2.78s

tasansal avatar Nov 21 '25 21:11 tasansal