datatree icon indicating copy to clipboard operation
datatree copied to clipboard

Dask-specific methods

Open TomNicholas opened this issue 2 years ago • 6 comments

xr.Dataset implements a bunch of dask-specific methods, such as __dask_tokenize__ and __dask_graph__. It also obviously has public methods that involve dask such as .compute() and .load().

In DataTree on the other hand, I haven't yet implemented any methods like these, or even written any tests that involve dask! You can probably still use dask with datatree right now, but from dask's perspective the datatree is presumably merely a set of unconnected Dataset objects.

We could choose to implement methods like .load() as just a mapping over the tree, i.e.

def load(self):
    for node in self.subtree:
        if node.has_data:
            node.ds.load()

but really this should probably wait for #41, or be done as part of that refactor.

I don't really understand what the double-underscore methods do though yet, so would appreciate input on that.

TomNicholas avatar May 19 '22 15:05 TomNicholas

understand what the double-underscore methods do though yet

https://docs.dask.org/en/stable/custom-collections.html

Xarray objects satisfy this Collections protocol, so you can do dask.tokenize(xarray_thing), dask.compute(xarray_thing) etc (visualize, persist).

dcherian avatar May 19 '22 15:05 dcherian

I have just discovered datatree and wonder if it helps address performance issues I have encountered with dask mostly due to exploding task graphs when working with very large datasets stored across many netcdf/very large/deeply nested zarr arrays with many chunks.

My solution was implement a basic tree in an intake driver, that for a 20 year dataset, uses monthly aggregates (of daily rasters) in kerchunk json with a delayed open_and_(optionally)load method. The complete dataset representation and graph is readily constructed at this coarser granularity, and then Dataset subsetting and loading of the monthly aggregates can occur on the workers, in a distributed fashion.

I tried to make use of finer grained parallelism by then using the threads scheduler within a worker, but ran into issues - so I got the best performance using many single threaded workers (a bit like the lambda examples I saw with pywren). The earlier prototype code and performance tests are in this notebook: https://github.com/pbranson/pixeldrill/blob/main/notebooks/access_via_aggindex.ipynb

Is there, in a sense, some overlap between this library and kerchunk and is there a logical point for them to interface?

Perhaps there is a more native way to handle this in dask that I need to learn about that encapsulated some kind of dynamic graph generation and nested distributed scheduling that doesn't need to be coordinated back to the central scheduler?

pbranson avatar Jul 24 '22 07:07 pbranson

@pbranson I think datatree's IO is less advanced than you are imagining.

At the moment all we do is look at a netCDF file / Zarr Store, iterate over the groups, open each group using xr.open_dataset, and arrange all the groups into a tree. We don't do anything else with dask graphs, chunking, loading, or intake.

Is there, in a sense, some overlap between this library and kerchunk and is there a logical point for them to interface?

I am not (yet) very familiar with kerchunk, but I think they do pretty different things. My understanding is that kerchunk allows you to open data as if it were chunked in a different pattern from how it is actually chunked on disk. Datatree does nothing special with chunks, it just arranges the results of different open_dataset calls into a convenient tree structure.

If you can use kerchunk in open_dataset, we could probably also use it in datatree, but I think the logical place to ask about integration would be on the xarray main repo.

TomNicholas avatar Jul 24 '22 16:07 TomNicholas

Thanks @TomNicholas and sorry for creating issue noise. I guess I got a bit carried away with these comments in the readme:

  • Has functions for mapping user-supplied functions over every node in the tree,
  • Automatically dispatches some of xarray.Dataset's API over every node in the tree (such as .isel),

I was thinking that maybe the datatree abstraction could be a more formalised and ultimately 'xarray native' approach to the the problems that have been tackled by e.g. intake-esm and intake-thredds. Leaves in the tree could compositions over netcdf files, which may be aggregated JSON indexes. I guess I was thinking that some sort of formalism over a nested datastructure could help in dask computational graph composition. I have run into issues where the scheduler gets overloaded, or just takes forever to start for calculations across large datasets composed with i.e. mf_opendataset

I wonder if @andersy005, @mdurant or @rsignell-usgs have any experience or thoughts about if it makes any sense for an interface between this library and intake?

pbranson avatar Jul 30 '22 20:07 pbranson

@pbranson you're asking great questions but I'm going to move this discussion over to #134 so as not to confuse it with the original purpose of this issue.

TomNicholas avatar Jul 30 '22 21:07 TomNicholas