tensorstore icon indicating copy to clipboard operation
tensorstore copied to clipboard

question about writing parallel and group handling

Open asparsa opened this issue 1 year ago • 39 comments

I'm using the tensorstore C++ API to write a Zarr file, and I need to address these two issue in my code: 1- I want to create a group like Zarr Python and create a hierarchy of namespace and datasets. Right now, I'm handling it using kvstore path while creating them. What is the optimal way? This approach takes more time than the Python version!

2- I'm trying to write data into each chunk parallelly. I wanted to avoid multithreading for now and use mpi instead. I'm writing everything from scratch for this purpose as I couldn't find anything in the documentation. Is there any better way to avoid multithreading?

Thanks

asparsa avatar Jun 21 '24 15:06 asparsa

Tensorstore automatically handles chunks in parallel, so you can just issue the write from a single thread and all relevant chunks will be handled in parallel.

Groups currently aren't supported unfortunately.

jbms avatar Jun 21 '24 15:06 jbms

Thanks, the timing makes more sense now.

As an example, If I have {10,10} chunks and I want to write to [35:45][35:45] parts. it will be divided into four parallel sections [35:40][35:40], [35:40][40:45], [40:45][35:40], and [40:45][40:45] and write 4 way parallel?

And what is the maximum number of parallel writes? Can it be equal to processes?

asparsa avatar Jun 21 '24 15:06 asparsa

Thanks, the timing makes more sense now.

As an example, If I have {10,10} chunks and I want to write to [35:45][35:45] parts. it will be divided into four parallel sections [35:40][35:40], [35:40][40:45], [40:45][35:40], and [40:45][40:45] and write 4 way parallel?

Exactly.

And what is the maximum number of parallel writes? Can it be equal to processes?

It depends on the underlying kvstore --- with the local filesystem: https://google.github.io/tensorstore/kvstore/file/index.html#json-Context.file_io_concurrency

With GCS: https://google.github.io/tensorstore/kvstore/gcs/index.html#json-Context.gcs_request_concurrency

I'm not sure what you mean as far as it being equal to number of processes, or how you intend to use MPI.

You can indeed also write in parallel from multiple processes, but tensorstore doesn't itself contain any support for that you. You will need to partition your work between processes, and for efficiency should ensure that the partitions are aligned to chunk boundaries.

jbms avatar Jun 21 '24 16:06 jbms

Thanks. I believe limit:"shared" is what I was searching for. Can I ask how Tensorstor is handling the parallelism?

asparsa avatar Jun 26 '24 17:06 asparsa

It depends. There are two mechanisms in use: thread pools and admission queues.

Generally the "shared" concurrency objects create a thread-pool with a concurrency limit of at least 4 (https://github.com/google/tensorstore/blob/master/tensorstore/internal/file_io_concurrency_resource.cc). Then file io operations are queued on the thread pool for completion, and the thread pool schedules/allows the 4 operations to run in parallel.

In the case of writing to gcs, the underlying kvstore uses an admission queue with a default value of 32 to control concurrency (https://github.com/google/tensorstore/blob/master/tensorstore/kvstore/gcs_http/gcs_resource.h). The admission queue then allows up to 32 parallel HTTP operations.

laramiel avatar Jun 26 '24 22:06 laramiel

Thanks again, I have another question. I am using zarr3 to write an array. When the file size becomes big, it will not finish the writing and will leave the folder with the metadata. (it will be only 4kb instead of 5GB)

auto write_result = tensorstore::Write(array, store); write_result.Force(); auto write_result2=write_result.result();

How can I ensure the writing process is complete? I want to ensure the data is on the disk, not the buffer or cache.

asparsa avatar Jun 27 '24 22:06 asparsa

That's an open-ended question. Have you tried logging the write_result.status()?

auto write_result = tensorstore::Write(array, store);
ABSL_LOG(INFO) << write_result.status();

You can look at how we write to tensorstore in examples/compute_percentiles.cc.

Can you distill your problem into a function which you can include in a comment?

laramiel avatar Jun 27 '24 23:06 laramiel

Also there was a recent fix for sharded writing --- make sure you are using a sufficiently new version (0.1.63 or later).

jbms avatar Jun 27 '24 23:06 jbms

Sorry for the ambiguousness. Thanks for notifying me about shared writing.

That's an open-ended question. Have you tried logging the write_result.status()? Yeah, I tried both status and result, and they returned OK.

I saw the example; I am using the same method. Zarr creates the chunk, but the chunk remains empty. I can create a simple code to reproduce the results if you want

asparsa avatar Jun 28 '24 00:06 asparsa

Yes, please post your code.

laramiel avatar Jun 28 '24 04:06 laramiel

I created a repo for that. https://github.com/asparsa/zarrtest/tree/main but here is the main.cpp (edited by laramiel):

#include <stdio.h>
#include <stdlib.h>
#include <time.h>

#include <iostream>
#include <typeinfo>

#include "tensorstore/context.h"
#include "tensorstore/index.h"
#include "tensorstore/open.h"
#include "tensorstore/open_mode.h"
#include "tensorstore/tensorstore.h"
#include "tensorstore/util/status.h"

using ::tensorstore::Context;

::nlohmann::json GetJsonSpec() {
  return {
      {"driver", "zarr3"},
      {"kvstore", {{"driver", "file"}, {"path", "/tmp/testo"}}},
      {"metadata",
       {{"data_type", "int16"},
        {"shape", {100, 100}},
        {"chunk_grid",
         {{"name", "regular"},
          {"configuration", {{"chunk_shape", {10, 10}}}}}}}},
  };
}


// try to create a zarr and write a 100*100 random array in it
int main() {
  auto context = Context::Default();
  auto store_result = tensorstore::Open(GetJsonSpec(), context,
                                        tensorstore::OpenMode::create |
                                            tensorstore::OpenMode::open,
                                        tensorstore::ReadWriteMode::read_write)
                          .result();
  if (!store_result.ok()) {
    std::cerr << "Error creating Zarr file: " << store_result.status() << std::endl;
    return -1;
  }
  std::cout << "Zarr file created successfully!" << std::endl;

  srand(time(0));
  auto array = tensorstore::AllocateArray<int16_t>({100, 100});
  for (tensorstore::Index i = 0; i < 100; ++i) {
    for (tensorstore::Index j = 0; j < 100; ++j) {
      array(i, j) = (rand() % 10) + 1;
    }
  }

  auto write_result = tensorstore::Write(array, store_result).status();
  if (!write_result.ok()) {
    std::cerr << "Error creating Zarr file: " << write_result << std::endl;
    return -1;
  }

  std::cout << "Zarr file wrote successfully!" << std::endl;
  return 0;
}

No matter how much data I write in it the zarr file remain 4kb.

asparsa avatar Jun 28 '24 16:06 asparsa

NOTE: I reformatted the above example and removed some unnecessary lines to make it more readable; I also changed the output location from testo to /tmp/testo.

The code above is essentially correct, but I think that you merely misunderstand the data storage format. The zarr.json file is the zarr-format metadata which describes the dataset. The data is stored in individual files within subdirectories of /tmp/tenso, such as:

$ find /tmp/testo | sort

/tmp/testo
/tmp/testo/c
/tmp/testo/c/0
/tmp/testo/c/0/0
/tmp/testo/c/0/1
/tmp/testo/c/0/2
/tmp/testo/c/0/3
...
/tmp/testo/c/9/7
/tmp/testo/c/9/8
/tmp/testo/c/9/9
/tmp/testo/zarr.json

Since your chunk shape is 10x10, and you write into indices 0..99 x 0..99, these files have indices from 0..9.

laramiel avatar Jun 30 '24 06:06 laramiel

Can I see your code? I was talking about the whole folder size that never exceeds 4KB. Yes, I was checking the data, not the metadata.

asparsa avatar Jun 30 '24 12:06 asparsa

I edited your above comment; that is exactly what I compiled.

Are you misunderstanding the 4.0K 'byte size' field in the ls output format, perhaps? The 4.0K for the c directory is the size of the directory structure itself on disk, likely a single block, not the size of all data within the directory. To get the sum of all file sizes you need to use the du command.

$ ls -lAh /tmp/testo
total 8.0K
drwxr-xr-x 12 laramiel laramiel 4.0K Jun 30 06:14 c
-rw-r--r--  1 laramiel laramiel  266 Jun 30 06:14 zarr.json

$ du -sh /tmp/testo
452K  /tmp/testo

laramiel avatar Jun 30 '24 17:06 laramiel

Oh, I see. Sorry, I confused that with file size. Thank you so much. I have another question. I am trying to use the MakeArray() function for different data types from different variables, but I have some difficulty using it every time. For examples, I looked at this, but most were created from numbers, not variables. Is there any example or documentation on that? This is one of the examples I try to write: ''' std::vector<int16_t> data_to_use(parsed_data.begin() + 4, parsed_data.end()); auto array = tensorstore::MakeArray<int16_t>(data_to_use); ''' I tried any combination of using and not using <int16_t> and still get no matching function for call to 'MakeArray<int16_t>(std::vector&) error. How should I handle them, and what other variable type can I pass to MakeArray? If I use MakeArrayView, can I write it to a Zarr driver?

asparsa avatar Jul 01 '24 18:07 asparsa

Is there any way to turn off all the compression in zarr3? as compression is no longer enabled.

asparsa avatar Jul 01 '24 23:07 asparsa

Setting the clevel to 0 in the Blosc codec should effectively disable compression.

brian-michell avatar Jul 02 '24 14:07 brian-michell

Won't that be an overhead? Calling the Blosc codec and getting out of it without compression.

Oh, I see. Sorry, I confused that with file size. Thank you so much. I have another question. I am trying to use the MakeArray() function for different data types from different variables, but I have some difficulty using it every time. For examples, I looked at this, but most were created from numbers, not variables. Is there any example or documentation on that? This is one of the examples I try to write: ''' std::vector<int16_t> data_to_use(parsed_data.begin() + 4, parsed_data.end()); auto array = tensorstore::MakeArray<int16_t>(data_to_use); ''' I tried any combination of using and not using <int16_t> and still get no matching function for call to 'MakeArray<int16_t>(std::vector&) error. How should I handle them, and what other variable type can I pass to MakeArray? If I use MakeArrayView, can I write it to a Zarr driver?

what you think about this?

asparsa avatar Jul 02 '24 18:07 asparsa

Won't that be an overhead? Calling the Blosc codec and getting out of it without compression.

I'm not too familiar with Zarr3, but it looks like specifying no codec will result in uncompressed data.

import tensorstore as ts
import numpy as np

spec = {
    'driver': 'zarr3',
    'kvstore': {
        'driver': 'file',
        'path': 'tmp/zarr3',
    },
    'create': True,
    'delete_existing': True,
    'metadata': {
        'shape': [100, 100],
        'data_type': 'float64',
        "codecs": [{"name": "blosc", "configuration": {"cname": "lz4", "clevel": 9}}] # You could remove this line
    }
}
tensorstore_object_result = ts.open(spec)
tensorstore_object = tensorstore_object_result.result()
data = np.random.rand(100, 100)
write_future = tensorstore_object.write(data)

# tmp/zarr3/c/0 results in 72K according to `du -sh 0`
# tmp/zarr3/c/0 results in 80K according to `du -sh 0` without the codec line

what you think about this?

I have never used the MakeArray() function. SharedArray has served my purposes with the Zarr driver so far, your milage may vary.

brian-michell avatar Jul 02 '24 19:07 brian-michell

Yes, you can write any of ArrayView / SharedArrayView / Array / SharedArray to tensorstore. So to make a tensorstore ArrayView of your parsed data you don't need to copy it to a vector; it should be possible to use this MakeArrayView method, more or less like this:

// No need to copy the data into an array, just use a tensorstore::span to pass the pointer + length.
auto array_view = tensorstore::MakeArrayView(
    tensorstore::span(parsed_data.begin() + 4, parsed_data.end()));
auto status = tensorstore::Write(array_view, store).result();

If you want full control over the array shape, such as creating an ArrayView with the shape 3x4 from the parsed_data, then you can construct an ArrayView directly, more or less like this:

// NOTE: In this example, std::distance(parsed_data.begin() + 4, parsed_data.end())
// must be at least 12.
tensorstore::StridedLayout<2> array_layout(tensorstore::c_order, sizeof(uint16_t), {3, 4});

// NOTE: array_view references array_layout, so the array_layout must outlive the array_view.
tensorstore::ArrayView<uint16_t, 2> array_view(&*(parsed_data.begin() + 4), array_layout);

auto status = tensorstore::Write(array_view, store).result();

laramiel avatar Jul 03 '24 08:07 laramiel

I followed the exact instructions and I got

 error: no matching function for call to 'Write(tensorstore::Array<double, 2, tensorstore::ArrayOriginKind::zero, tensorstore::ContainerKind::view>&)'
  213 |  auto write_result = tensorstore::Write(data).result();
      |                      ~~~~~~~~~~~~~~~~~~^~~~~~
In file included from /u/asalimiparsa/iometry/build/_deps/tensorstore-src/tensorstore/open.h:28,
                 from /u/asalimiparsa/iometry/plugins/macsio_zarr.c:24:
/u/asalimiparsa/iometry/build/_deps/tensorstore-src/tensorstore/tensorstore.h:772:1: note: candidate: 'template<class SourceArray, class Target> tensorstore::internal::EnableIfCanCopyArrayToTensorStore<typename tensorstore::internal_result::UnwrapResultHelper<typename std::remove_cv<typename std::remove_reference<_Tp>::type>::type>::type, typename tensorstore::internal_result::UnwrapResultHelper<typename std::remove_cv<typename std::remove_reference<_Arg>::type>::type>::type, tensorstore::WriteFutures> tensorstore::Write(SourceArray&&, Target&&, tensorstore::WriteOptions)'
  772 | Write(SourceArray&& source, Target&& target, WriteOptions options) {

This is the same error I got with converting from other types.

asparsa avatar Jul 08 '24 04:07 asparsa

Sorry, I omitted the second Tensorstore parameter from write in the above example; updated.

You can see that more parameters are required by the error message.

laramiel avatar Jul 08 '24 09:07 laramiel

still I'm getting error: no matching function for call to 'DriverWrite(tensorstore::Array<double, 2, tensorstore::ArrayOriginKind::zero, tensorstore::ContainerKind::view>&, tensorstore::internal::Driver::Handle&, std::remove_referencetensorstore::WriteOptions&::type)'

asparsa avatar Jul 08 '24 23:07 asparsa

My best advice here is to (1) look for the differences between what you have and working examples, (2) try and determine what you want to be doing based on reading the code, and (3) always post your code when you want help with an error message. Otherwise anyone who tries to help is just guessing.

laramiel avatar Jul 09 '24 15:07 laramiel

My bad! I didn't notice It would be ambiguous. I just changed the datatype to double in your code and received that error.

tensorstore::ArrayView<double, 2> array_view(&*(parsed_data.begin() + 4), array_layout);

auto status = tensorstore::Write(array_view, store).result();

And I received :

 error: no matching function for call to 'Write(tensorstore::Array<double, 2, tensorstore::ArrayOriginKind::zero, tensorstore::ContainerKind::view>&)'
  213 |  auto write_result = tensorstore::Write(data).result();
      |                      ~~~~~~~~~~~~~~~~~~^~~~~~
In file included from /u/asalimiparsa/iometry/build/_deps/tensorstore-src/tensorstore/open.h:28,
                 from /u/asalimiparsa/iometry/plugins/macsio_zarr.c:24:
/u/asalimiparsa/iometry/build/_deps/tensorstore-src/tensorstore/tensorstore.h:772:1: note: candidate: 'template<class SourceArray, class Target> tensorstore::internal::EnableIfCanCopyArrayToTensorStore<typename tensorstore::internal_result::UnwrapResultHelper<typename std::remove_cv<typename std::remove_reference<_Tp>::type>::type>::type, typename tensorstore::internal_result::UnwrapResultHelper<typename std::remove_cv<typename std::remove_reference<_Arg>::type>::type>::type, tensorstore::WriteFutures> tensorstore::Write(SourceArray&&, Target&&, tensorstore::WriteOptions)'
  772 | Write(SourceArray&& source, Target&& target, WriteOptions options) {

I also tested the code without changing the datatype and received the same error with: error: no matching function for call to 'Write(tensorstore::Array<uint16_t, 2, tensorstore::ArrayOriginKind::zero, tensorstore::ContainerKind::view>&)'

asparsa avatar Jul 09 '24 16:07 asparsa

Write takes two parameters. But additionally, since Write is asynchronous, the array needs to have a Shared element pointer so that Write can retain a reference. Here since you are calling result() you can be sure the lifetime of the source array is sufficient --- it is therefore safe to use UnownedToShared to convert an unowned array to a SharedArray.

jbms avatar Jul 09 '24 18:07 jbms

I used all the methods you suggested, and all came with an error. For example:

void const *buf = 0;
buf = json_object_extarr_data(extarr_obj);
const double* double_buf = static_cast<const double*>(buf);
const Index rows = shape[0];
const Index cols = shape[1];
tensorstore::StridedLayout<2> array_layout(tensorstore::c_order, sizeof(double), {rows,cols});
tensorstore::ArrayView<double, 2> array_view(&*(double_buf), array_layout);
auto data= tensorstore::Open(json_spec, context, tensorstore::OpenMode::open, tensorstore::ReadWriteMode::read_write);
auto write_result = tensorstore::Write(arrayview, data).result();

Make error:

 error: no matching function for call to 'DriverWrite(tensorstore::Array<double, 2, tensorstore::ArrayOriginKind::zero, tensorstore::ContainerKind::view>&, tensorstore::internal::Driver::Handle&, std::remove_reference<tensorstore::WriteOptions&>::type)'
  776 |         return internal::DriverWrite(
      |                ~~~~~~~~~~~~~~~~~~~~~^
  777 |             std::forward<decltype(unwrapped_source)>(unwrapped_source),
      |             ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  778 |             internal::TensorStoreAccess::handle(
      |             ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  779 |                 std::forward<decltype(unwrapped_target)>(unwrapped_target)),
      |                 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  780 |             std::move(options));

And I have no idea how to fix it.

asparsa avatar Jul 15 '24 03:07 asparsa

I can't reproduce your exact code as it's lacking some context, but I believe you have the parameters in the write incorrect.

The write is expecting tensorstore::Write(SourceArray, Target, WriteOptions...) but it looks like you're just providing tensorstore::Write(Target, WriteOptions).

I believe what you want would be auto write_result = tensorstore::Write(array_view, data, /*optional options go here*/).result();

brian-michell avatar Jul 15 '24 13:07 brian-michell

I'm sorry. I made a typo when writing the question. I am passing the data and target to the write, and it makes that error.

asparsa avatar Jul 15 '24 17:07 asparsa

Try something like:

auto write_result = tensorstore::Write(tensorstore::UnownedToShared(arrayview), data).result();

laramiel avatar Jul 16 '24 18:07 laramiel