dask icon indicating copy to clipboard operation
dask copied to clipboard

Writing inplace a modified zarr array with dask failed

Open AntSimi opened this issue 5 years ago • 10 comments

I read a zarr array, where i need to modify some values. And after i want to save at same place, with overwrite options.

import dask.array as da
from dask.distributed import Client, LocalCluster
import dask

def put_2(data):
    # Put 2 at first index of block
    data[0] = 2
    return data

if __name__ == '__main__':
    print('Version ', dask.__version__)
    # Bug come, when i used a cluster
    cluster = LocalCluster()
    client = Client(cluster)

    # Create ones matrix with two chunk
    d = da.ones(10, chunks=(5,), dtype='i4')
    # Save it
    d.to_zarr('/tmp/data.zarr', component='d', overwrite=True)

    # Read Ones
    read_d = da.from_zarr('/tmp/data.zarr', component='d')
    # Apply put_2 on each block
    new_d = da.blockwise(put_2, 'i', read_d, 'i', dtype='i4')
    print(new_d.compute())
    # Save inplace of previous array
    a = new_d.to_zarr('/tmp/data.zarr', component='d', overwrite='True')
    # Read new save :(
    read_d = da.from_zarr('/tmp/data.zarr', component='d')
    print(read_d.compute())

Output

Version  2.11.0
[2 1 1 1 1 2 1 1 1 1]
[2 0 0 0 0 2 0 0 0 0]

It seems "create" command from zarr is apply before readings value, so when we try to read we get an empty dataset.

AntSimi avatar Feb 24 '20 18:02 AntSimi

@AntSimi thank you for the bug report. I verified similar behavior but noticed that upon multiple runs it sometimes is correct. Not sure what's going on here. Can I ask you to try without distributed ?

quasiben avatar Feb 24 '20 19:02 quasiben

I modify example to remove dask distributed:

import dask.array as da

def put_2(data):
    # Put a zero at first index of block
    data[0] = 2
    return data

if __name__ == '__main__':
    # Create ones matrix with two chunk
    d = da.ones(100, chunks=(5,), dtype='i4')
    # Save it
    d.to_zarr('/tmp/data.zarr', component='d', overwrite=True)
    # Read Ones
    read_d = da.from_zarr('/tmp/data.zarr', component='d')
    # Apply put_0 on each block
    new_d = da.blockwise(put_2, 'i', read_d, 'i', dtype='i4')
    # Save inplace of previous array
    a = new_d.to_zarr('/tmp/data.zarr', component='d', overwrite='True', compute=False)
    # a.visualize('tree.png')
    a.compute()
    # Read new save :(
    read_d = da.from_zarr('/tmp/data.zarr', component='d')
    d = read_d.compute()
    print(d)
    print((d!=0).all())

Several outputs

...@...:$ python zarr_dask_over_write.py
 [2 1 1 1 1 2 1 1 1 1 2 1 1 1 1 2 1 1 1 1 2 1 1 1 1 2 1 1 1 1 2 1 1 1 1 2 1
 1 1 1 2 1 1 1 1 2 1 1 1 1 2 1 1 1 1 2 1 1 1 1 2 1 1 1 1 2 1 1 1 1 2 1 1 1
 1 2 1 1 1 1 2 1 1 1 1 2 1 1 1 1 2 1 1 1 1 2 1 1 1 1]
True
(...@...:$ python zarr_dask_over_write.py 
[2 1 1 1 1 2 1 1 1 1 2 1 1 1 1 2 1 1 1 1 2 1 1 1 1 2 1 1 1 1 2 1 1 1 1 2 1
 1 1 1 2 1 1 1 1 2 1 1 1 1 2 1 1 1 1 2 1 1 1 1 2 0 0 0 0 2 0 0 0 0 2 0 0 0
 0 2 0 0 0 0 2 0 0 0 0 2 0 0 0 0 2 0 0 0 0 2 0 0 0 0]
False
...@...:$ python zarr_dask_over_write.py 
[2 1 1 1 1 2 1 1 1 1 2 1 1 1 1 2 1 1 1 1 2 1 1 1 1 2 1 1 1 1 2 1 1 1 1 2 1
 1 1 1 2 1 1 1 1 2 1 1 1 1 2 1 1 1 1 2 1 1 1 1 2 1 1 1 1 2 1 1 1 1 2 1 1 1
 1 2 1 1 1 1 2 1 1 1 1 2 1 1 1 1 2 1 1 1 1 2 1 1 1 1]
True

Most of the time result is good, but sometime is wrong with dask.distributed the error occurs more. My guess is "create" step could occurs aftter or before all "from_zarr" step. tree Create step with "overwrite=True" do an "rmdirs". In my real use i have a dataset which are bigger than memory so i couldn't fully load before create step.

AntSimi avatar Feb 25 '20 08:02 AntSimi

I am taking a look at this now and can only reproduce with the distributed case. I think the issue is happening within the to_zarr method actually. If you add compute=False to the to_zarr, it seems to work as expected:

import dask.array as da
from distributed import Client

client = Client()

def put_2(data):
    # Put 2 at first index of block
    data[0] = 2
    return data

# Create ones matrix with two chunk
d = da.ones(10, chunks=(5,), dtype='i4')
# Save it
d.to_zarr('/tmp/data.zarr', component='d', overwrite=True)

# Read Ones
read_d = da.from_zarr('/tmp/data.zarr', component='d')
# Apply put_2 on each block
new_d = da.blockwise(put_2, 'i', read_d, 'i', dtype='i4')
print(f"NEW: {new_d.compute()}")
# Save inplace of previous array
new_d.to_zarr('/tmp/data.zarr', component='d', overwrite=True, compute=False)
print(f"AFTER WRITE: {new_d.compute()}")
# Read new save :(
read_d = da.from_zarr('/tmp/data.zarr', component='d')
print(f"READ: {read_d.compute()}")

With compute=False I get:

NEW: [2 1 1 1 1 2 1 1 1 1]
AFTER WRITE: [2 1 1 1 1 2 1 1 1 1]
READ: [1 1 1 1 1 1 1 1 1 1]

With compute=True I get:

NEW: [2 1 1 1 1 2 1 1 1 1]
AFTER WRITE: [2 0 0 0 0 2 0 0 0 0]
READ: [2 0 0 0 0 2 0 0 0 0]

jsignell avatar Mar 12 '21 15:03 jsignell

I am able to recreate similar error with huge array, original array have only 1 and we put some 2, but several times i found zeros in array

import dask.array as da

def put_2(data):
    # Put a zero at first index of block
    data[0] = 2
    return data

if __name__ == '__main__':
    # Create ones matrix with two chunk
    d = da.ones(100000000, chunks=(2500000,), dtype='i4')
    # Save it
    d.to_zarr('/tmp/data.zarr', component='d', overwrite=True)
    # Read Ones
    read_d = da.from_zarr('/tmp/data.zarr', component='d')
    # Apply put_2 on each block
    new_d = da.blockwise(put_2, 'i', read_d, 'i', dtype='i4')
    # Save inplace of previous array
    a = new_d.to_zarr('/tmp/data.zarr', component='d', overwrite='True', compute=False)
    a.compute()
    # Read new save :(
    read_d = da.from_zarr('/tmp/data.zarr', component='d')
    d = read_d.compute()
    print((d!=0).all())

Print result must be always True, but several time i get False I used python 3.7.10 with dask-core-2021.3.0 and zarr 2.6.1

AntSimi avatar Mar 12 '21 15:03 AntSimi

I was able to get the desired output with this diff:

diff --git a/dask/array/core.py b/dask/array/core.py
index 9aa10950..f45a3619 100644
--- a/dask/array/core.py
+++ b/dask/array/core.py
@@ -3318,10 +3318,9 @@ def to_zarr(
     # sharded across workers (not done serially on the originating
     # machine).  Or the caller may decide later to not to do this
     # computation, and so nothing should be written to disk.
-    z = delayed(zarr.create)(
-        shape=arr.shape,
+    z = delayed(zarr.array)(
+        data=arr,
         chunks=chunks,
-        dtype=arr.dtype,
         store=mapper,
         path=component,
         overwrite=overwrite,

Would you be able to see if that works for you?

jsignell avatar Mar 12 '21 15:03 jsignell

I'm not sure what the implications are for performance, but that seems like a reasonable solution to me. I'll open a PR to carry on the conversation.

jsignell avatar Mar 12 '21 15:03 jsignell

Thanks, with this patch, it work well for me.

AntSimi avatar Mar 12 '21 15:03 AntSimi

After a quick look to dask graph. I think there will be a problem for array larger than memory, because if i understood correctly the graph all data must be loaded before to start storage. image

AntSimi avatar Mar 12 '21 16:03 AntSimi

Hi there, quick question: Is this issue considered as closed? The corresponding PR https://github.com/dask/dask/pull/7379 was not brought forward and -if I understand correctly- it would force the whole execution before writing data to disk, which doesn't play well with large data.

I am hitting this issue again: I load large amount of data via from_zarr(), I do some processing, and then I'd like to use to_zarr(..., overwrite=True) - but something goes wrong and I end up with very small files (correct shape and chunks, wrong data).

My current quick&dirty workaround is to save data to a new zarr component and then play with folders (remove old one, move new one to correct path), but it means that I have to create/remove/move large temporary folders (to_zarr can be called to save a 30G array, for instance).

Code:

Click to expand!
import shutil


def to_zarr_custom(newzarrurl=None, array=None, component="", overwrite=False):

  """

  Custom workaround for dask issue
  (https://github.com/dask/dask/issues/5942), where a dask array loaded with
  from_zarr cannot be written with to_zarr(..., overwrite=True).

  :param newzarrurl: ouput zarr file
  :type newzarrurl: str
  :param array: dask array to be stored
  :type array: dask array
  :param component: target subfolder of the zarr file (optional, default "")
  :type component: str
  :param overwrite: overwrite existing data (optional, default False)
  :type overwrite: boolean

  """

  # Check required arguments
  if newzarrurl is None:
      raise Exception("ERROR: Missing newzarrurl arg in to_zarr_custom")
  if array is None:
      raise Exception("ERROR: Missing array arg in to_zarr_custom")

  # Sanitize arguments
  if component.endswith("/"):
      component = component[:-1]
  if not newzarrurl.endswith("/"):
      newzarrurl += "/"

  if overwrite:
      tmp_suffix = "_TEMPORARY_ZARR_ARRAY"
      array.to_zarr(
          newzarrurl,
          component=component + tmp_suffix,
          dimension_separator="/",
      )
      shutil.rmtree(newzarrurl + component)
      shutil.move(
          newzarrurl + component + tmp_suffix, newzarrurl + component
      )
  else:
      array.to_zarr(newzarrurl, component=component, dimension_separator="/")

I am happy to provide more context/tests if it is useful, but more generally I'd like to know whether @AntSimi or others found any workaround that they are happy with. Thanks!

tcompa avatar Jun 27 '22 13:06 tcompa

@tcompa I don't find a workaround

AntSimi avatar Jul 01 '22 08:07 AntSimi