Coalesce and parallelize partial shard reads
This PR optimizes reading more than one, but not all, chunks from a shard. Towards #1758.
- Combines requests for chunks which are nearby within a shard into a single call to the byte_getter. There's a configurable maximum gap between requests and maximum coalesced total request size -- if either of those are exceeded a new group is made.
- The coalesced groups are requested with
concurrent_map.
To be explicit, in my mind the optimizations in this have two goals:
- Speed up the time it takes to read many chunks from a single shard. Most of the benefit on this point comes from using concurrent_map but with especially small chunks coalescing can also help reduce request overhead.
- Reduce the total count of calls to the byte_getter. If the store (e.g. object storage) has a per operation charge this results in reduced costs.
Mostly these goals aren't in conflict. The maximum coalesced size option provides a knob to ensure we aren't coalescing requests into a single very large request which would be faster to make as multiple concurrent calls.
TODO:
- [x] Add unit tests and/or doctests in docstrings
- [x] Add docstrings and API docs for any new/modified user-facing classes and functions
- [x] New/modified features documented in
docs/user-guide/*.rst - [x] Changes documented as a new file in
changes/ - [x] GitHub Actions have all passed
- [x] Test coverage is 100% (Codecov passes)
I'm curious what folks think of this and looking for input! One particular design question to note: this adds a nested concurrent_map (there's already one in the codec pipeline's read/read_batch) so it's technically possible to have concurrency of async.concurrency**2. That said, I think we want some concurrency here.
@aldenks thanks for working on this, it's a great optimization direction to pursue.
That being said, since this is a performance optimization, it might be helpful to get some metrics for how much perf we get with these changes, in terms of speed and also latency / number of requests. If reducing requests is the goal, you could consider implementing a WrapperStore as a demonstration or test fixture that tracks the number of times its methods are invoked, and use that to show that your approach reduces requests. But any other demonstration would work too.
As for the specific code changes (and the nested concurrent map), nothing jumps out to me as problematic, but that might just be my ignorance of async python and the sharding code 😁
I'm working on a couple performance tests as Davis suggested above. Trying airspeed velocity which is new to me but looks useful.
Something like this might be more useful than asv here. A regression test of this form would also be great, if it can be made to work
https://github.com/pydata/xarray/blob/e37bfbf1bff63476de9e9d95eedbbd66d864f588/xarray/tests/test_backends.py#L3415
Got pulled away after pushing those changes but before describing them -- I'll update this tomorrow with profiling results.
alright!
- Tests added that check for coalesced (reduced count) of calls to store
- Profiled execution time and store request count
I don't necessarily need a detailed review at this point -- if yall let me know any open questions you have or give a thumbs up ill work on the remaining todos in the description.
Tests
Added test_sharding_multiple_chunks_partial_shard_read which is parameterized to test both with and with out coalescing and I'm seeing the behavior I'd expect. e.g.:
if coalesce_reads:
# 2 shard index requests + 2 coalesced chunk data byte ranges (one for each shard)
assert store_mock.get.call_count == 4
else:
# 2 shard index requests + 6 chunks
assert store_mock.get.call_count == 8
Profiling
I added a slow test which reads a few subsets of a large-ish shard containing small-ish chunks under a range of settings. See test_partial_shard_read_performance for details.
The test is currently skipped and I don't necessarily think it should be a test, probably belongs in bench or I'm happy to remove now it's done its job.
To recreate the figures below:
- Check out the commit before the optimization changes https://github.com/zarr-developers/zarr-python/pull/3004/commits/91fbdf9299bef95624459e7722259cdaf04f5afe and run
hatch env run --env test.py3.12-2.1-optional -- pytest -s tests/test_codecs/test_sharding.py::test_partial_shard_read_performance --run-slow-hypothesis. Then checkout the latest commit https://github.com/zarr-developers/zarr-python/pull/3004/commits/9f6727da283d60cd6187c2876271fa59e76db23b and run the same command again. (yes, this is what airspeed velocity is for but i opted for simpler for now.) - Run the plotting code below
Plotting code
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
df_coalesce = pd.read_json(
"zarr-python-partial-shard-read-performance-with-coalesce.json"
)
df_no_coalesce = pd.read_json(
"zarr-python-partial-shard-read-performance-no-coalesce.json"
)
df_no_coalesce["coalesce_max_gap"] = "Before optimization"
df = pd.concat([df_no_coalesce, df_coalesce])
# Define statements and metrics
statements = df["statement"].unique()
metrics = ["time", "store_get_calls"]
metric_titles = ["Execution Time", "Number of Store Get Calls"]
metric_ylabels = ["Time (s)", "Number of Calls"]
# Create labels for coalesce values
coalesce_values = df["coalesce_max_gap"].unique()
coalesce_labels = [
x if isinstance(x, str) else "Disabled" if x == -1 else f"{x/1024/1024:.0f}MiB"
for x in coalesce_values
]
# Get unique get_latency values
latency_values = df["get_latency"].unique()
# For each unique get_latency value, create a separate figure
for latency in latency_values:
latency_df = df[df["get_latency"] == latency]
max_time = latency_df["time"].max()
max_store_get_calls = latency_df["store_get_calls"].max()
plt.figure(figsize=(16, 12))
plt.suptitle(
"Performance when reading a subset of a shard\n"
"Execution time and count of store `get` calls for 3 array access patterns\n"
"Array and shard shape: (512, 512, 512), chunk shape: (64, 64, 64)\n"
"Store get latency: local ssd"
+ (
""
if latency == 0
else f" + {latency * 1000:.0f}ms (simulated object storage)"
),
fontsize=14, )
# One row per statement, two columns: time and store_get_calls
for i, statement in enumerate(statements):
statement_df = latency_df[latency_df["statement"] == statement]
# Plot for time
plt.subplot(3, 2, i * 2 + 1)
sns.barplot(
data=statement_df,
x="concurrency",
y="time",
hue="coalesce_max_gap",
hue_order=coalesce_values,
palette="muted",
errorbar=None,
)
plt.title(f"{statement} - {metric_titles[0]}")
plt.xlabel("Concurrency")
plt.ylabel(metric_ylabels[0])
plt.ylim(0, max_time * 1.1)
plt.legend(title="Coalesce Max Gap", labels=coalesce_labels)
# Plot for store_get_calls
plt.subplot(3, 2, i * 2 + 2)
sns.barplot(
data=statement_df,
x="concurrency",
y="store_get_calls",
hue="coalesce_max_gap",
hue_order=coalesce_values,
palette="muted",
errorbar=None,
)
plt.title(f"{statement} - {metric_titles[1]}")
plt.xlabel("Concurrency")
plt.ylabel(metric_ylabels[1])
plt.ylim(0, max_store_get_calls * 1.1)
plt.legend(title="Coalesce Max Gap", labels=coalesce_labels)
plt.tight_layout()
plt.savefig(f"zarr-metrics-by-statement-with-latency-{latency}s.png", dpi=300)
plt.show()
plt.close()
Local SSD
The results shown in this figure read from my local SSD. Before this optimization and when run on my ssd, the time to read chunks serially in the loop in ShardingCodec._decode_partial_single was about the same as the time to decode chunks (which was parallelized already within the self.codec_pipeline.read call).
Simulated Object Storage
This optimization is most important reading from higher latency storage. To simulate this I added 10ms of latency to each get call. (I read that with specific networking setups you can get down to 1ms latency from S3, I also regularly see >100ms in practice, so 10ms feels fair)
Takeaways
None of these are particularly surprising, but my takeaways are:
- Most of the execution time benefit comes from adding concurrency
- We can reduce store request count by enabling coalescing and get equal or better performance to adding concurrency alone
- If we coalesce over gaps that are too large execution time increases a bit as we're requesting more data we don't need. 1MiB seems like a decent middle ground as a maximum gap between chunks to coalesce.
This is very cool!
On the topic of benchmarking: When reading compressed & sharded datasets, LDeakin's zarr_benchmarks suggests that zarr-python (without this PR) is quite slow. Note that the size of the performance difference isn't immediately obvious from a quick glance at the zarr_benchmark graphs, until you read the numbers above each bar in the bar charts! For example, in the "round trip benchmark", zarrs takes 3 seconds to read a compressed and sharded dataset from a local SSD, whilst zarr-python takes 181 seconds!
Hopefully this PR will help zarr-python to catch up. Thank you, Alden!
If it's helpful, and if this PR is ready, then I could try running LDeakin's zarr_benchmarks with this PR, if you'd like?
Thanks @JackKelly! I'd be happy for more benchmarking of this. I took a look at LDeakin's zarr_benchmarks and I would expect the changes in the PR to not have any impact. All the read tests in there appear to read complete shards and this PR doesn't touch that path. If you're adding benchmarks I might advocate for one that reads subsets of multiple shards as I find that's a pretty common read pattern in practice.
Noted! I've started making some notes for my plans for benchmarking here. Feel free to comment!
(BTW, please don't let my benchmarks hold up this PR! IMHO, the benchmarks that Alden has presented above are more than sufficient to demonstrate that this PR speeds up zarr-python. The aim with my benchmarking is more to compare zarr-python (with and without this PR) against zarrs).
(Just a quick note to say that I'm afraid I'm now planning to experiment with storing NWP data in Parquet (using Polars)... so I might not get round to benchmarking Zarr readers soon, I'm sorry...)
Any maintainers, I don't necessarily need a detailed review at this point -- if yall let me know any open questions you have or give a thumbs up ill work on the remaining todos in the description.
I'm approaching having time to wrap up the TODOs in the description and still eager to get this merged
Codecov Report
:white_check_mark: All modified and coverable lines are covered by tests.
:white_check_mark: Project coverage is 94.56%. Comparing base (926a52f) to head (78313aa).
Additional details and impacted files
@@ Coverage Diff @@
## main #3004 +/- ##
==========================================
+ Coverage 94.54% 94.56% +0.02%
==========================================
Files 78 78
Lines 9423 9459 +36
==========================================
+ Hits 8909 8945 +36
Misses 514 514
| Files with missing lines | Coverage Δ | |
|---|---|---|
| src/zarr/codecs/sharding.py | 95.02% <100.00%> (+0.54%) |
:arrow_up: |
| src/zarr/core/config.py | 83.33% <ø> (ø) |
:rocket: New features to boost your workflow:
- :snowflake: Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
Ready for review!
at long last
The PR optimizes reading part of a shard in two ways:
- Requests for nearby chunks in the same shard are combined into a single request to the store. This combining process respects two config options which define 1) the maximum gap in bytes between two chunks to combine over and 2) the maximum size of a combined group.
- The resulting requests to the store are performed concurrently if there are > 1 groups within a shard after coalescing.
Motivation:
- speed, see profiling in this comment above
- reduced request count to stores, and therefore operation costs when using most object storage providers
A couple questions to draw attention to:
- The newly added concurrent_map over groups within a shard is nested within the existing concurrent_map in BatchedCodecPipeline.read_batch. This means one could have concurrency of up to
async.concurrency ^ 2. I don't see a clean way to limit concurrency overall and the concurrency is the main source of the speedup. The default concurrency (10) is also pretty conservative imo so the extreme case of squaring it is perhaps ok? - The defaults for the config options. Here's my thinking and experience with them.
- max gap: see the bar plots in the performance table, its a tradeoff. The lower the store latency, the less gap you want and latencies are trending down so 1MiB feels like a fairly conservative default. It also is the value used by rust's object_store in their get_ranges function (and that value isn't configurable) and that's a very well tested in production library.
- max group size. I added this to avoid the case where coalescing causes you to loose concurrency when reading from a large (~200MB+) shard and that lower concurrency reduces throughput. I didn't tune this value much, but from my experience and what I read online 100MiB is likely large enough to amorize out most of the per request overhead and get close to the maximum per individual store request throughput.
Minor notes/questions
- Patch code cov is not 100%, but when i look at the report it looks like its 100% of the functional lines of code
- Is passing
fill_value=0rather thanshard_spec.fill_valueto the two cases ofnd_buffer.createin the sharding codec intentional? The overall behavior is correct currently because it gets filled later, but feels a bit like a bug in waiting.
@d-v-b who should I request a review from on this?
@zarr-developers/python-core-devs who wants to check out this sweet PR
a high level comment -- the tests are all at the array level, which is effectively an integration test, testing how this functionality fits into the entire stack. It would be a lot more direct if we could do lower-level tests, e.g. create a model of a shard, compare the proposed reading plan against the reading plan we expect. This kind of test might not be easy to write in our codebase today, but if so we should identify things we need to change to bring us there.
(not a blocker for this PR if these tests are hard to write)
Thank you for the first review @d-v-b! I didn't get to this before going out of office but ill address your comments when im back in ~1 week.
this is great! With #3561 now creating sharded arrays is almost as fast as regular chunked arrays; but reads are still painfully slow and does too many requests even for sequential reads in the slow direction. Any idea if this PR will be pushed forward?
this is great! With #3561 now creating sharded arrays is almost as fast as regular chunked arrays; but reads are still painfully slow and does too many requests even for sequential reads in the slow direction. Any idea if this PR will be pushed forward?
For example:
I am using this to create data on GCS:
import zarr
from distributed import Client
import dask.array as da
client = Client(n_workers=8, threads_per_worker=2)
path = "gs://bucket/prefix/bench.zarr"
root = zarr.create_group(path, overwrite=True)
# Example: Create a sharded array
sharded = root.create_array(
name="sharded",
shape=(1355, 2456, 1501),
shards=(256, 256, 256),
chunks=(32, 32, 32),
dtype='float32',
)
chunked = root.create_array(
name="chunked",
shape=(1355, 2456, 1501),
chunks=(128, 128, 128),
dtype='float32',
)
data = da.random.normal(size=sharded.shape, chunks=(256, 256, 256))
%%time
data.to_zarr(chunked)
gives 40.4s
%%time
data.to_zarr(sharded)
gives 64s.
Which is somewhat reasonable.
However, the reads:
%%time
root["chunked"][256]
9.22s
%%time
root["sharded"][256]
17.7s
The same read with tensorstore on the sharded version is significantly faster.
Here is the tensorstore version:
import tensorstore
import tensorstore as ts
spec = {
"driver": "zarr3",
"kvstore": {
"driver": "gcs",
"bucket": "bucket",
"path": "prefix/bench.zarr/sharded"
}
}
# Open the Zarr array (async, use .result() for blocking)
array = ts.open(spec).result()
then
%%time
array[256].read().result()
results in 2.78s