zarr-python
zarr-python copied to clipboard
[V3] Support for batched Store API in v3
The Store
API in v3 supports fetching a single key at a time or partial values of multiple keys.
class Store(ABC):
@abstractmethod
async def get(
self, key: str, byte_range: Optional[Tuple[int, Optional[int]]] = None
) -> Optional[bytes]:
...
@abstractmethod
async def get_partial_values(
self, key_ranges: List[Tuple[str, Tuple[int, int]]]
) -> List[bytes]:
...
The new BatchedCodecPipeline
, for instance, currently fetches data for all requested chunks concurrently by using an asyncio thread pool with each task calling the get
method of the Store
:
chunk_bytes_batch = await concurrent_map(
[(byte_getter,) for byte_getter, _, _, _ in batch_info],
lambda byte_getter: byte_getter.get(),
runtime_configuration.concurrency,
)
Since there have been some concerns about scalability of asyncio for a large number of tasks, would it make sense to move this batched fetch into the Store
itself? This would allow another Store
implementation to potentially use a more performant asynchronous framework for the batched fetch, say in C++ or Rust, and can look like a single asyncio task to zarr-python.
This is a feature that currently exists in v2 through the getitems
Store API which is used to enable GPU Direct Storage in Zarr through kvikIO.
A similar feature is also now added (provisionally) to v3 codecs that support an encode_batch
and a decode_batch
with a default implementation that ships tasks off to an asyncio thread pool but allows a codec to override that in favor or another approach if needed.
Do queues have a role to play here?
To me it makes sense that the store and the codec pipeline would communicate via a queue. The store puts compressed chunks into a queue. The codec pipeline pulls items out and processes them. Either one could do this either via single iterms or in batches. If the queue is full, the store could stop fetching chunks until there is room (backpressure).
I'd love to see a batched Store API in Zarr-Python!
(Just to connect the dots: Here's an informal proposal I wrote for a Store.get_items
method for Zarr-Python... which includes links to @akshaysubr's previous discussions about this topic!)
the store and the codec pipeline would communicate via a queue
Yes! I agree! (Ideally a multi-producer multi-consumer queue.) For reference, here's my current planned design for the light-speed-io
Rust crate that I'm currently chipping away at. The relevant part for this discussion is the crossbeam::channel
which sends chunks from "Layer 1 (the IO layer)" to "Layer 2: Parallel computation on chunks" :slightly_smiling_face:
My ultimate aim is to allow Zarr-Python (and others) to offload IO (and decompression?) to Rust, if users want that.
I've also been thinking that a producer/consumer queue is the way forward here but I'm not clear at all where in the stack these queues would go. IMO, this is the part of the equation that is missing for us to take action here.
If this queue was an object which could be addressed by non-python code, then that would really open the door to the sort of interoperability we are looking for.
Python std library has two Queue objects:
- https://docs.python.org/3/library/queue.html
- https://docs.python.org/3/library/asyncio-queue.html
Both of these can be limited on the number of items. But I think we want to limit on the total number of bytes stored instead.
We could also implement our own purpose-built Queue in a low-level language and talk to it from Python.
Here's an example of sending items to a python Queue from an async task and then processing those items from a thread. This is basically what we want to do between store and codec pipeline.
import threading
import queue
import asyncio
import time
SLEEP = 0.1
async def generate_data(item, q):
# non-blocking sleep--represents time spent on I/O
await asyncio.sleep(SLEEP)
print(f"putting item {item}")
q.put(item)
def process_item(q):
item = q.get()
# blocking sleep--represents CPU-bound task
time.sleep(SLEEP)
print(f"processed {item}")
q.task_done()
def worker():
while True:
process_item(q)
async def generate_items():
for n in range(10):
await generate_data(n, q)
q = queue.Queue()
threading.Thread(target=worker, daemon=True).start()
await generate_items()
On the question of sharing a queue between Python and Rust: I like the idea. But I've searched the PyO3 GitHub organisation for "queue" and haven't found anything relevant :disappointed:
For my main use-case, where I'd like to load and decompress on the order of 1 million chunks per second (on a single machine), I'd probably want to keep the chunk-by-chunk loading & processing in Rust-land. I'd guess that a Python queue would struggle to handle 1 million items per second, whilst still leaving enough CPU cycles for decompression and other bits and bobs (but I haven't tried and I'd love to be proved wrong!).
What I'm imagining - whilst being very ignorant of the current architecture of Zarr-Python v3 - is something like this:
The user would, in a single function call, ask Zarr-Python to load some subset of a huge array. Zarr-Python would calculate the set of chunks that need to be loaded[^1]. Then Zarr-Python would, in a single async function call, ask the Store
to "get and decompress these million chunks, and let me know when you're done". It'd be entirely up to the Store
to figure out how best to parallelise the work, and how to pass data from IO to decompression.
Could that work? (I'm sure there's some glaring problem with that overly simple sketch!) I'm guessing that doesn't fit at all with the way that Zarr v3 splits up IO and codecs?
[^1]: Although it's possible that Python would struggle to compute which million chunks we want, in the time available. So we may also want to compute the set of chunks in Rust-land. But then we've got a full Zarr implementation in Rust!
In the current architecture, the Store doesn't handle decompression. That's a separate thing (the Codec Pipeline).
ah, yes, of course... sorry.... :flushed:... I need to re-read the Zarr v3 spec soon :slightly_smiling_face: !
On the topic of sharing a queue between Rust and Python... In a recent comment, a core maintainer of the PyO3 library says:
You could wrap the receiver end of the [Rust
std::sync::mpsc::]channel
in a#[pyclass]
to expose the receiving end to Python. You have the choice of using blocking or async to wait for messages.
I need to re-read the Zarr v3 spec soon 🙂 !
This is not really covered by the spec. It's about the software architecture of Zarr Python (which is exactly what this issue is for discussing).
I'd guess that a Python queue would struggle to handle 1 million items per second
Here's a quick experiment
import threading
import queue
import asyncio
import time
class Timer:
def __init__(self):
self._log = [("start", time.perf_counter())]
def log(self, message):
self._log.append((message, time.perf_counter()))
def __repr__(self):
out = f"<Timer started at {self._log[0][1]}>\n"
for n in range(1, len(self._log)):
message = self._log[n][0]
diff = self._log[n][1] - self._log[n-1][1]
out += f" - {message}, {diff}\n"
return out
# don't add any overhead
SLEEP = 0
async def enqueue_data(batch, q):
# non-blocking sleep--represents time spent on I/O
await asyncio.sleep(SLEEP)
for item in batch:
q.put(item)
def process_item(worker_id, q, batch_size=1):
# not sure if batching really optimizes anything here since
for _ in range(batch_size):
item = q.get()
# blocking sleep--represents CPU-bound task
time.sleep(5*SLEEP)
q.task_done()
def worker(worker_id, q, batch_size=1):
while True:
process_item(worker_id, q, batch_size=batch_size)
async def generate_items(NITEMS, q, batch_size=1):
for batch in batched(range(NITEMS), batch_size):
await enqueue_data(batch, q)
NTHREADS = 32
NITEMS = 1_000_000
timer = Timer()
q = queue.Queue()
timer.log("create queue")
threads = [
threading.Thread(
target=worker,
args=(worker_id, q,),
kwargs={"batch_size": 100},
daemon=True
).start()
for worker_id in range(NTHREADS)
]
timer.log("create threads")
await generate_items(NITEMS, q, batch_size=100)
timer.log(f"generate {NITEMS} items")
q.join()
timer.log("finish processing")
pritn(timer)
<Timer started at 108872.175694578>
- create queue, 4.448898835107684e-05
- create threads, 0.003170869007590227
- generate 1000000 items, 0.6771369689959101
- finish processing, 10.761253934004344
So I was able to enqueue 1_000_000 items in less than a second (working in batches of 100), but it took me 10 seconds to process them (32 core machine). This example took 10 minutes to cook up, so probably lots of room for improvement.
This discussion about how to orchestrate a producer-consumer workflow between chunk readers (Store) and decompressors (Codecs) is really interesting. But I worry that it is taking away from the more fundamental question of the required Store API. Even if we use a queue to orchestrate these tasks, the current Store API only allows the store to put one item at a time into the queue rather than a batch of items at a time into the queue.
I would propose that we think about implementation of points raised in this issue in two steps:
- Add a batched API to the Store base class that defaults to just doing concurrent loads from multiple threads (similar to the default
decode_batch
in the Codec API). This way, a Store implementation can decide if it wants single chunk fetching being the natural way to do things or if it makes more sense for the implementation to think of batched case being the default with single chunk fetches being a specialization of that. - The concurrency policy is essentially encoded into the
CodecPipeline
. This is where I think the mapping of these tasks to a specific system architecture should be encoded. We would want to have different approaches reading from a local drive or an object store and similarly for computing on the CPU vs GPU. There are two codec pipelines currently in #1670: ABatchedCodecPipeline
and anInterleavedCodecPipeline
depending on what is better for a specific hardware architecture. We can certainly add anotherDynamicCodecPipeline
that uses queues.
"get and decompress these million chunks, and let me know when you're done"
@JackKelly This should be possible by implementing a custom CodecPipeline
, potentially in Rust, but is broader scope than the Store. The scope of light-speed-io seems to be aligned with the scope of CodecPipeline. Though, I don't quite understand how light-speed-io would interface with existing Codecs. Would those be python calls into the Codec API from Rust?
The scope of light-speed-io seems to be aligned with the scope of CodecPipeline. Though, I don't quite understand how light-speed-io would interface with existing Codecs. Would those be python calls into the Codec API from Rust?
Before I can answer properly, I need to get myself up-to-speed on the design of the CodecPipeline
:slightly_smiling_face:! Is #1670 the best place to read the latest info about the CodecPipeline
? Is there an architecture diagram showing how CodecPipeline
fits into zarr-python? (no worries if not! I appreciate that everyone is short on time!)
I don't quite understand how light-speed-io would interface with existing Codecs. Would those be python calls into the Codec API from Rust?
My plan is for light-speed-io
to call codec libraries written in C/C++ or Rust. For example, there's a Rust wrapper for c-blosc2
. Although I'll probably start with more "mainstream" codecs like lz4
.
Whilst it's technically possible for Rust to call Python code (via PyO3), I'd worry that would kill any performance benefits! (Although I haven't benchmarked it!)
The scope of light-speed-io seems to be aligned with the scope of CodecPipeline. Though, I don't quite understand how light-speed-io would interface with existing Codecs. Would those be python calls into the Codec API from Rust?
Before I can answer properly, I need to get myself up-to-speed on the design of the
CodecPipeline
🙂! Is #1670 the best place to read the latest info about theCodecPipeline
? Is there an architecture diagram showing howCodecPipeline
fits into zarr-python? (no worries if not! I appreciate that everyone is short on time!)
The PR is the best place for the moment. Basically, the CodecPipelines get a list of chunks to fetch with the according slices, fetch the data from the store, decode the chunks and assemble the chunk arrays into 1 output array.
Currently, there are 2 implementations: batched and interleaved. I want to combine both into one.
It would not be a big change for the codec pipeline to accept a queue. Probably changes the Store API considerably, though.
Sounds great! Thank you for the explanation.
I'll try to enable my little light-speed-io
Rust crate to provide CodecPipeline
-like functionality (fetching data from IO, decoding, assembling chunks into 1 output array) and a Python API. The ultimate aim could be that light-speed-io
could provide a drop-in CodecPipeline
for zarr-python
. (Although light-speed-io
will be limited to local storage on Linux for a little while!) Does that sound viable and vaguely useful?
Although... I should stop mumbling about my pet project and instead help to pull the focus of this thread back to @akshaysubr's original questions about a batched Store
API!....
The ultimate aim could be that
light-speed-io
could provide a drop-inCodecPipeline
forzarr-python
. (Althoughlight-speed-io
will be limited to local storage on Linux for a little while!) Does that sound viable and vaguely useful?
Yes, that should be possible. At this point, I wouldn't consider the CodecPipeline
a stable API, though. It will probably change quite a bit until the 3.0 release and maybe even afterwards.
No worries, sounds good.
When you do start the process of stabilising the CodecPipeline
API, please could you tag me in the discussion? I'll probably just passively listen to the discussion, so I can have the best chance of moulding light-speed-io
to play nicely with zarr-python
v3 :slightly_smiling_face:
This thread got off topic. @akshaysubr - can you help us get back on track? What I don't understand about your request is why get_partial_values
isn't a sufficient drop in for getitems
? Despite its name, get_partial_values
works just fine when requesting full chunks.
@jhamman You're right, get_partial_values
should work just fine in theory, but there are a couple of issues:
- The current batched codec pipeline doesn't use
get_partial_values
and dispatches theget
method to a bunch of threads. - Calling the method
get_partial_values
is somewhat unintuitive from a naming perspective. Also might make the code a bit harder to understand ifget_partial_values
is called on full chunks across the codebase.
There's also the question of whether get
should be the base implementation that getitems
/get_partial_values
builds on top off or the other way around. It would seem that the other way around might be better for performance?