cubed icon indicating copy to clipboard operation
cubed copied to clipboard

[Docs] Usage instructions for progressbars

Open norlandrhagen opened this issue 1 year ago • 2 comments

I've been inspired by @thodson-usgs's Virtualizarr / Cubed rechunking example work and figured it's time to give Cubed a shot.

Wondering if it's possible to add some documentation on how to use progress bars to monitor a cubed job. I've tried piecing together some from the examples, but haven't had much luck.

RichProgressBar:


from cubed import Spec
import xarray as xr 
spec = Spec(work_dir='tmp', allowed_mem='2GB')

from cubed.extensions.rich import RichProgressBar

ds = xr.tutorial.open_dataset('air_temperature', chunked_array_type='cubed',
     from_array_kwargs={'spec': spec},chunks={})
ds = ds.chunk({'time':1})

ds.compute(callbacks=RichProgressBar())

# OR

ds.to_zarr("air_temp.zarr", mode='w',consolidated=True, chunkmanager_store_kwargs={'from_array_kwargs': {'spec': spec}, 'callbacks':RichProgressBar() })



Throws a TypeError: zip() argument after * must be an iterable, not RichProgressBar

Full traceback:


TypeError Traceback (most recent call last) Cell In[4], line 16 14 ds.air.data.visualize() 15 # ds.compute(callbacks=RichProgressBar()) ---> 16 ds.to_zarr("air_temp.zarr", mode='w',consolidated=True, chunkmanager_store_kwargs={'from_array_kwargs': {'spec': spec}, 'callbacks':RichProgressBar() })

File ~/micromamba/envs/cubed/lib/python3.11/site-packages/xarray/core/dataset.py:2553, in Dataset.to_zarr(self, store, chunk_store, mode, synchronizer, group, encoding, compute, consolidated, append_dim, region, safe_chunks, storage_options, zarr_version, write_empty_chunks, chunkmanager_store_kwargs) 2406 """Write dataset contents to a zarr group. 2407 2408 Zarr chunks are determined in the following way: (...) 2549 The I/O user guide, with more details and examples. 2550 """ 2551 from xarray.backends.api import to_zarr -> 2553 return to_zarr( # type: ignore[call-overload,misc] 2554 self, 2555 store=store, 2556 chunk_store=chunk_store, 2557 storage_options=storage_options, 2558 mode=mode, 2559 synchronizer=synchronizer, 2560 group=group, 2561 encoding=encoding, 2562 compute=compute, 2563 consolidated=consolidated, 2564 append_dim=append_dim, 2565 region=region, 2566 safe_chunks=safe_chunks, 2567 zarr_version=zarr_version, 2568 write_empty_chunks=write_empty_chunks, 2569 chunkmanager_store_kwargs=chunkmanager_store_kwargs, 2570 )

File ~/micromamba/envs/cubed/lib/python3.11/site-packages/xarray/backends/api.py:1721, in to_zarr(dataset, store, chunk_store, mode, synchronizer, group, encoding, compute, consolidated, append_dim, region, safe_chunks, storage_options, zarr_version, write_empty_chunks, chunkmanager_store_kwargs) 1719 # TODO: figure out how to properly handle unlimited_dims 1720 dump_to_store(dataset, zstore, writer, encoding=encoding) -> 1721 writes = writer.sync( 1722 compute=compute, chunkmanager_store_kwargs=chunkmanager_store_kwargs 1723 ) 1725 if compute: 1726 _finalize_store(writes, zstore)

File ~/micromamba/envs/cubed/lib/python3.11/site-packages/xarray/backends/common.py:267, in ArrayWriter.sync(self, compute, chunkmanager_store_kwargs) 264 if chunkmanager_store_kwargs is None: 265 chunkmanager_store_kwargs = {} --> 267 delayed_store = chunkmanager.store( 268 self.sources, 269 self.targets, 270 lock=self.lock, 271 compute=compute, 272 flush=True, 273 regions=self.regions, 274 **chunkmanager_store_kwargs, 275 ) 276 self.sources = [] 277 self.targets = []

File ~/micromamba/envs/cubed/lib/python3.11/site-packages/xarray/namedarray/daskmanager.py:249, in DaskManager.store(self, sources, targets, **kwargs) 241 def store( 242 self, 243 sources: Any | Sequence[Any], 244 targets: Any, 245 **kwargs: Any, 246 ) -> Any: 247 from dask.array import store --> 249 return store( 250 sources=sources, 251 targets=targets, 252 **kwargs, 253 )

File ~/micromamba/envs/cubed/lib/python3.11/site-packages/dask/array/core.py:1229, in store(failed resolving arguments) 1227 elif compute: 1228 store_dsk = HighLevelGraph(layers, dependencies) -> 1229 compute_as_if_collection(Array, store_dsk, map_keys, **kwargs) 1230 return None 1232 else:

File ~/micromamba/envs/cubed/lib/python3.11/site-packages/dask/base.py:403, in compute_as_if_collection(cls, dsk, keys, scheduler, get, **kwargs) 401 schedule = get_scheduler(scheduler=scheduler, cls=cls, get=get) 402 dsk2 = optimization_function(cls)(dsk, keys, **kwargs) --> 403 return schedule(dsk2, keys, **kwargs)

File ~/micromamba/envs/cubed/lib/python3.11/site-packages/dask/threaded.py:90, in get(dsk, keys, cache, num_workers, pool, **kwargs) 87 elif isinstance(pool, multiprocessing.pool.Pool): 88 pool = MultiprocessingPoolExecutor(pool) ---> 90 results = get_async( 91 pool.submit, 92 pool._max_workers, 93 dsk, 94 keys, 95 cache=cache, 96 get_id=_thread_get_id, 97 pack_exception=pack_exception, 98 **kwargs, 99 ) 101 # Cleanup pools associated to dead threads 102 with pools_lock:

File ~/micromamba/envs/cubed/lib/python3.11/site-packages/dask/local.py:427, in get_async(submit, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs) 425 dsk = dict(dsk) 426 with local_callbacks(callbacks) as callbacks: --> 427 _, _, pretask_cbs, posttask_cbs, _ = unpack_callbacks(callbacks) 428 started_cbs = [] 429 succeeded = False

File ~/micromamba/envs/cubed/lib/python3.11/site-packages/dask/callbacks.py:89, in unpack_callbacks(cbs) 87 """Take an iterable of callbacks, return a list of each callback.""" 88 if cbs: ---> 89 return [[i for i in f if i] for f in zip(*cbs)] 90 else: 91 return [(), (), (), (), ()]

TypeError: zip() argument after * must be an iterable, not RichProgressBar

Fingers crossed it's a user error!

norlandrhagen avatar Aug 05 '24 17:08 norlandrhagen