tensorstore
tensorstore copied to clipboard
question about writing parallel and group handling
I'm using the tensorstore C++ API to write a Zarr file, and I need to address these two issue in my code: 1- I want to create a group like Zarr Python and create a hierarchy of namespace and datasets. Right now, I'm handling it using kvstore path while creating them. What is the optimal way? This approach takes more time than the Python version!
2- I'm trying to write data into each chunk parallelly. I wanted to avoid multithreading for now and use mpi instead. I'm writing everything from scratch for this purpose as I couldn't find anything in the documentation. Is there any better way to avoid multithreading?
Thanks
Tensorstore automatically handles chunks in parallel, so you can just issue the write from a single thread and all relevant chunks will be handled in parallel.
Groups currently aren't supported unfortunately.
Thanks, the timing makes more sense now.
As an example, If I have {10,10} chunks and I want to write to [35:45][35:45] parts. it will be divided into four parallel sections [35:40][35:40], [35:40][40:45], [40:45][35:40], and [40:45][40:45] and write 4 way parallel?
And what is the maximum number of parallel writes? Can it be equal to processes?
Thanks, the timing makes more sense now.
As an example, If I have {10,10} chunks and I want to write to [35:45][35:45] parts. it will be divided into four parallel sections [35:40][35:40], [35:40][40:45], [40:45][35:40], and [40:45][40:45] and write 4 way parallel?
Exactly.
And what is the maximum number of parallel writes? Can it be equal to processes?
It depends on the underlying kvstore --- with the local filesystem: https://google.github.io/tensorstore/kvstore/file/index.html#json-Context.file_io_concurrency
With GCS: https://google.github.io/tensorstore/kvstore/gcs/index.html#json-Context.gcs_request_concurrency
I'm not sure what you mean as far as it being equal to number of processes, or how you intend to use MPI.
You can indeed also write in parallel from multiple processes, but tensorstore doesn't itself contain any support for that you. You will need to partition your work between processes, and for efficiency should ensure that the partitions are aligned to chunk boundaries.
Thanks. I believe limit:"shared" is what I was searching for. Can I ask how Tensorstor is handling the parallelism?
It depends. There are two mechanisms in use: thread pools and admission queues.
Generally the "shared" concurrency objects create a thread-pool with a concurrency limit of at least 4 (https://github.com/google/tensorstore/blob/master/tensorstore/internal/file_io_concurrency_resource.cc). Then file io operations are queued on the thread pool for completion, and the thread pool schedules/allows the 4 operations to run in parallel.
In the case of writing to gcs, the underlying kvstore uses an admission queue with a default value of 32 to control concurrency (https://github.com/google/tensorstore/blob/master/tensorstore/kvstore/gcs_http/gcs_resource.h). The admission queue then allows up to 32 parallel HTTP operations.
Thanks again, I have another question. I am using zarr3 to write an array. When the file size becomes big, it will not finish the writing and will leave the folder with the metadata. (it will be only 4kb instead of 5GB)
auto write_result = tensorstore::Write(array, store);
write_result.Force();
auto write_result2=write_result.result();
How can I ensure the writing process is complete? I want to ensure the data is on the disk, not the buffer or cache.
That's an open-ended question. Have you tried logging the write_result.status()?
auto write_result = tensorstore::Write(array, store);
ABSL_LOG(INFO) << write_result.status();
You can look at how we write to tensorstore in examples/compute_percentiles.cc.
Can you distill your problem into a function which you can include in a comment?
Also there was a recent fix for sharded writing --- make sure you are using a sufficiently new version (0.1.63 or later).
Sorry for the ambiguousness. Thanks for notifying me about shared writing.
That's an open-ended question. Have you tried logging the
write_result.status()? Yeah, I tried both status and result, and they returned OK.
I saw the example; I am using the same method. Zarr creates the chunk, but the chunk remains empty. I can create a simple code to reproduce the results if you want
Yes, please post your code.
I created a repo for that. https://github.com/asparsa/zarrtest/tree/main but here is the main.cpp (edited by laramiel):
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <iostream>
#include <typeinfo>
#include "tensorstore/context.h"
#include "tensorstore/index.h"
#include "tensorstore/open.h"
#include "tensorstore/open_mode.h"
#include "tensorstore/tensorstore.h"
#include "tensorstore/util/status.h"
using ::tensorstore::Context;
::nlohmann::json GetJsonSpec() {
return {
{"driver", "zarr3"},
{"kvstore", {{"driver", "file"}, {"path", "/tmp/testo"}}},
{"metadata",
{{"data_type", "int16"},
{"shape", {100, 100}},
{"chunk_grid",
{{"name", "regular"},
{"configuration", {{"chunk_shape", {10, 10}}}}}}}},
};
}
// try to create a zarr and write a 100*100 random array in it
int main() {
auto context = Context::Default();
auto store_result = tensorstore::Open(GetJsonSpec(), context,
tensorstore::OpenMode::create |
tensorstore::OpenMode::open,
tensorstore::ReadWriteMode::read_write)
.result();
if (!store_result.ok()) {
std::cerr << "Error creating Zarr file: " << store_result.status() << std::endl;
return -1;
}
std::cout << "Zarr file created successfully!" << std::endl;
srand(time(0));
auto array = tensorstore::AllocateArray<int16_t>({100, 100});
for (tensorstore::Index i = 0; i < 100; ++i) {
for (tensorstore::Index j = 0; j < 100; ++j) {
array(i, j) = (rand() % 10) + 1;
}
}
auto write_result = tensorstore::Write(array, store_result).status();
if (!write_result.ok()) {
std::cerr << "Error creating Zarr file: " << write_result << std::endl;
return -1;
}
std::cout << "Zarr file wrote successfully!" << std::endl;
return 0;
}
No matter how much data I write in it the zarr file remain 4kb.
NOTE: I reformatted the above example and removed some unnecessary lines to make it more readable; I also changed the output location from testo to /tmp/testo.
The code above is essentially correct, but I think that you merely misunderstand the data storage format. The zarr.json file is the zarr-format metadata which describes the dataset. The data is stored in individual files within subdirectories of /tmp/tenso, such as:
$ find /tmp/testo | sort
/tmp/testo
/tmp/testo/c
/tmp/testo/c/0
/tmp/testo/c/0/0
/tmp/testo/c/0/1
/tmp/testo/c/0/2
/tmp/testo/c/0/3
...
/tmp/testo/c/9/7
/tmp/testo/c/9/8
/tmp/testo/c/9/9
/tmp/testo/zarr.json
Since your chunk shape is 10x10, and you write into indices 0..99 x 0..99, these files have indices from 0..9.
Can I see your code? I was talking about the whole folder size that never exceeds 4KB. Yes, I was checking the data, not the metadata.
I edited your above comment; that is exactly what I compiled.
Are you misunderstanding the 4.0K 'byte size' field in the ls output format, perhaps? The 4.0K for the c directory is the size of the directory structure itself on disk, likely a single block, not the size of all data within the directory. To get the sum of all file sizes you need to use the du command.
$ ls -lAh /tmp/testo
total 8.0K
drwxr-xr-x 12 laramiel laramiel 4.0K Jun 30 06:14 c
-rw-r--r-- 1 laramiel laramiel 266 Jun 30 06:14 zarr.json
$ du -sh /tmp/testo
452K /tmp/testo
Oh, I see. Sorry, I confused that with file size.
Thank you so much.
I have another question. I am trying to use the MakeArray() function for different data types from different variables, but I have some difficulty using it every time. For examples, I looked at this, but most were created from numbers, not variables. Is there any example or documentation on that?
This is one of the examples I try to write:
'''
std::vector<int16_t> data_to_use(parsed_data.begin() + 4, parsed_data.end());
auto array = tensorstore::MakeArray<int16_t>(data_to_use);
'''
I tried any combination of using and not using <int16_t> and still get no matching function for call to 'MakeArray<int16_t>(std::vector
Is there any way to turn off all the compression in zarr3? as compression is no longer enabled.
Setting the clevel to 0 in the Blosc codec should effectively disable compression.
Won't that be an overhead? Calling the Blosc codec and getting out of it without compression.
Oh, I see. Sorry, I confused that with file size. Thank you so much. I have another question. I am trying to use the MakeArray() function for different data types from different variables, but I have some difficulty using it every time. For examples, I looked at this, but most were created from numbers, not variables. Is there any example or documentation on that? This is one of the examples I try to write: ''' std::vector<int16_t> data_to_use(parsed_data.begin() + 4, parsed_data.end()); auto array = tensorstore::MakeArray<int16_t>(data_to_use); ''' I tried any combination of using and not using <int16_t> and still get no matching function for call to 'MakeArray<int16_t>(std::vector&) error. How should I handle them, and what other variable type can I pass to MakeArray? If I use MakeArrayView, can I write it to a Zarr driver?
what you think about this?
Won't that be an overhead? Calling the Blosc codec and getting out of it without compression.
I'm not too familiar with Zarr3, but it looks like specifying no codec will result in uncompressed data.
import tensorstore as ts
import numpy as np
spec = {
'driver': 'zarr3',
'kvstore': {
'driver': 'file',
'path': 'tmp/zarr3',
},
'create': True,
'delete_existing': True,
'metadata': {
'shape': [100, 100],
'data_type': 'float64',
"codecs": [{"name": "blosc", "configuration": {"cname": "lz4", "clevel": 9}}] # You could remove this line
}
}
tensorstore_object_result = ts.open(spec)
tensorstore_object = tensorstore_object_result.result()
data = np.random.rand(100, 100)
write_future = tensorstore_object.write(data)
# tmp/zarr3/c/0 results in 72K according to `du -sh 0`
# tmp/zarr3/c/0 results in 80K according to `du -sh 0` without the codec line
what you think about this?
I have never used the MakeArray() function. SharedArray has served my purposes with the Zarr driver so far, your milage may vary.
Yes, you can write any of ArrayView / SharedArrayView / Array / SharedArray to tensorstore. So to make a tensorstore ArrayView of your parsed data you don't need to copy it to a vector; it should be possible to use this MakeArrayView method, more or less like this:
// No need to copy the data into an array, just use a tensorstore::span to pass the pointer + length.
auto array_view = tensorstore::MakeArrayView(
tensorstore::span(parsed_data.begin() + 4, parsed_data.end()));
auto status = tensorstore::Write(array_view, store).result();
If you want full control over the array shape, such as creating an ArrayView with the shape 3x4 from the parsed_data, then you can construct an ArrayView directly, more or less like this:
// NOTE: In this example, std::distance(parsed_data.begin() + 4, parsed_data.end())
// must be at least 12.
tensorstore::StridedLayout<2> array_layout(tensorstore::c_order, sizeof(uint16_t), {3, 4});
// NOTE: array_view references array_layout, so the array_layout must outlive the array_view.
tensorstore::ArrayView<uint16_t, 2> array_view(&*(parsed_data.begin() + 4), array_layout);
auto status = tensorstore::Write(array_view, store).result();
I followed the exact instructions and I got
error: no matching function for call to 'Write(tensorstore::Array<double, 2, tensorstore::ArrayOriginKind::zero, tensorstore::ContainerKind::view>&)'
213 | auto write_result = tensorstore::Write(data).result();
| ~~~~~~~~~~~~~~~~~~^~~~~~
In file included from /u/asalimiparsa/iometry/build/_deps/tensorstore-src/tensorstore/open.h:28,
from /u/asalimiparsa/iometry/plugins/macsio_zarr.c:24:
/u/asalimiparsa/iometry/build/_deps/tensorstore-src/tensorstore/tensorstore.h:772:1: note: candidate: 'template<class SourceArray, class Target> tensorstore::internal::EnableIfCanCopyArrayToTensorStore<typename tensorstore::internal_result::UnwrapResultHelper<typename std::remove_cv<typename std::remove_reference<_Tp>::type>::type>::type, typename tensorstore::internal_result::UnwrapResultHelper<typename std::remove_cv<typename std::remove_reference<_Arg>::type>::type>::type, tensorstore::WriteFutures> tensorstore::Write(SourceArray&&, Target&&, tensorstore::WriteOptions)'
772 | Write(SourceArray&& source, Target&& target, WriteOptions options) {
This is the same error I got with converting from other types.
Sorry, I omitted the second Tensorstore parameter from write in the above example; updated.
You can see that more parameters are required by the error message.
still I'm getting error: no matching function for call to 'DriverWrite(tensorstore::Array<double, 2, tensorstore::ArrayOriginKind::zero, tensorstore::ContainerKind::view>&, tensorstore::internal::Driver::Handle&, std::remove_referencetensorstore::WriteOptions&::type)'
My best advice here is to (1) look for the differences between what you have and working examples, (2) try and determine what you want to be doing based on reading the code, and (3) always post your code when you want help with an error message. Otherwise anyone who tries to help is just guessing.
My bad! I didn't notice It would be ambiguous. I just changed the datatype to double in your code and received that error.
tensorstore::ArrayView<double, 2> array_view(&*(parsed_data.begin() + 4), array_layout);
auto status = tensorstore::Write(array_view, store).result();
And I received :
error: no matching function for call to 'Write(tensorstore::Array<double, 2, tensorstore::ArrayOriginKind::zero, tensorstore::ContainerKind::view>&)'
213 | auto write_result = tensorstore::Write(data).result();
| ~~~~~~~~~~~~~~~~~~^~~~~~
In file included from /u/asalimiparsa/iometry/build/_deps/tensorstore-src/tensorstore/open.h:28,
from /u/asalimiparsa/iometry/plugins/macsio_zarr.c:24:
/u/asalimiparsa/iometry/build/_deps/tensorstore-src/tensorstore/tensorstore.h:772:1: note: candidate: 'template<class SourceArray, class Target> tensorstore::internal::EnableIfCanCopyArrayToTensorStore<typename tensorstore::internal_result::UnwrapResultHelper<typename std::remove_cv<typename std::remove_reference<_Tp>::type>::type>::type, typename tensorstore::internal_result::UnwrapResultHelper<typename std::remove_cv<typename std::remove_reference<_Arg>::type>::type>::type, tensorstore::WriteFutures> tensorstore::Write(SourceArray&&, Target&&, tensorstore::WriteOptions)'
772 | Write(SourceArray&& source, Target&& target, WriteOptions options) {
I also tested the code without changing the datatype and received the same error with: error: no matching function for call to 'Write(tensorstore::Array<uint16_t, 2, tensorstore::ArrayOriginKind::zero, tensorstore::ContainerKind::view>&)'
Write takes two parameters. But additionally, since Write is asynchronous, the array needs to have a Shared element pointer so that Write can retain a reference. Here since you are calling result() you can be sure the lifetime of the source array is sufficient --- it is therefore safe to use UnownedToShared to convert an unowned array to a SharedArray.
I used all the methods you suggested, and all came with an error. For example:
void const *buf = 0;
buf = json_object_extarr_data(extarr_obj);
const double* double_buf = static_cast<const double*>(buf);
const Index rows = shape[0];
const Index cols = shape[1];
tensorstore::StridedLayout<2> array_layout(tensorstore::c_order, sizeof(double), {rows,cols});
tensorstore::ArrayView<double, 2> array_view(&*(double_buf), array_layout);
auto data= tensorstore::Open(json_spec, context, tensorstore::OpenMode::open, tensorstore::ReadWriteMode::read_write);
auto write_result = tensorstore::Write(arrayview, data).result();
Make error:
error: no matching function for call to 'DriverWrite(tensorstore::Array<double, 2, tensorstore::ArrayOriginKind::zero, tensorstore::ContainerKind::view>&, tensorstore::internal::Driver::Handle&, std::remove_reference<tensorstore::WriteOptions&>::type)'
776 | return internal::DriverWrite(
| ~~~~~~~~~~~~~~~~~~~~~^
777 | std::forward<decltype(unwrapped_source)>(unwrapped_source),
| ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
778 | internal::TensorStoreAccess::handle(
| ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
779 | std::forward<decltype(unwrapped_target)>(unwrapped_target)),
| ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
780 | std::move(options));
And I have no idea how to fix it.
I can't reproduce your exact code as it's lacking some context, but I believe you have the parameters in the write incorrect.
The write is expecting tensorstore::Write(SourceArray, Target, WriteOptions...) but it looks like you're just providing tensorstore::Write(Target, WriteOptions).
I believe what you want would be auto write_result = tensorstore::Write(array_view, data, /*optional options go here*/).result();
I'm sorry. I made a typo when writing the question. I am passing the data and target to the write, and it makes that error.
Try something like:
auto write_result = tensorstore::Write(tensorstore::UnownedToShared(arrayview), data).result();