earthaccess icon indicating copy to clipboard operation
earthaccess copied to clipboard

Document distributed workflow, e.g. "dask delayed for local cluster on AWS EC2 instance *at scale*" tutorial

Open DeanHenze opened this issue 1 year ago • 22 comments

Hi all,

Dean here, currently on the Data Publication Team at PO.DAAC. This is my first GitHub issue so please feel free to let me know how I can improve this post. I have a basic case of using earthaccess with dask that I am wondering if you all want to accommodate in future versions.

I start by searching for some MUR granules on PO.DAAC, which are simple gridded netcdf files with a variable for sea surface temperature

import earthaccess
import xarray as xr

earthaccess.login()
granule_info = earthaccess.search_data(short_name="MUR25-JPL-L4-GLOB-v04.2", count=10)

which works fine. Then I define a function to open a file and compute mean SST for one MUR granule:

def sstmean_1file(gran_info_single): # Takes an earthaccess.results.DataGranule object.
    fileobj = earthaccess.open([gran_info_single])[0]
    data = xr.open_dataset(fileobj)
    return data['analysed_sst'].mean().item()

Calling this function with the following code works fine both on my laptop and an AWS EC2 instance:

print(sstmean_1file(granule_info[0]))

Next, using this function with dask delayed works fine on my laptop with the following code:

from dask.distributed import Client
from dask import delayed
import dask.array as da

# Start cluster:
client = Client(n_workers=2, threads_per_worker=1)

# Process several granules in parallel using Dask:
sstmean_1file_parallel = delayed(sstmean_1file)
tasks = [sstmean_1file_parallel(gi) for gi in granule_info[:2]]
results = da.compute(*tasks) 
print(results)

However, it fails with the following output when I run it from an EC2 instance:

2024-03-21 21:08:07,913 - distributed.worker - WARNING - Compute Failed
Key:       sstmean_1file-732177b9-01ce-49e6-b3ee-043b12d06117
Function:  sstmean_1file
args:      (Collection: {'Version': '4.2', 'ShortName': 'MUR25-JPL-L4-GLOB-v04.2'}
Spatial coverage: {'HorizontalSpatialDomain': {'Geometry': {'BoundingRectangles': [{'WestBoundingCoordinate': -180, 'SouthBoundingCoordinate': -90, 'EastBoundingCoordinate': 180, 'NorthBoundingCoordinate': 90}]}}}
Temporal coverage: {'RangeDateTime': {'EndingDateTime': '2002-09-01T21:00:00.000Z', 'BeginningDateTime': '2002-08-31T21:00:00.000Z'}}
Size(MB): 1.8629217147827148
Data: ['https://archive.podaac.earthdata.nasa.gov/podaac-ops-cumulus-protected/MUR25-JPL-L4-GLOB-v04.2/20020901090000-JPL-L4_GHRSST-SSTfnd-MUR25-GLOB-v02.0-fv04.2.nc'])
kwargs:    {}
Exception: 'AttributeError("\'NoneType\' object has no attribute \'open\'")'

2024-03-21 21:08:07,915 - distributed.worker - WARNING - Compute Failed
Key:       sstmean_1file-2bb2c549-d507-484d-a45d-f805bf90cf1c
Function:  sstmean_1file
args:      (Collection: {'Version': '4.2', 'ShortName': 'MUR25-JPL-L4-GLOB-v04.2'}
Spatial coverage: {'HorizontalSpatialDomain': {'Geometry': {'BoundingRectangles': [{'WestBoundingCoordinate': -180, 'SouthBoundingCoordinate': -90, 'EastBoundingCoordinate': 180, 'NorthBoundingCoordinate': 90}]}}}
Temporal coverage: {'RangeDateTime': {'EndingDateTime': '2002-09-02T21:00:00.000Z', 'BeginningDateTime': '2002-09-01T21:00:00.000Z'}}
Size(MB): 1.8608589172363281
Data: ['https://archive.podaac.earthdata.nasa.gov/podaac-ops-cumulus-protected/MUR25-JPL-L4-GLOB-v04.2/20020902090000-JPL-L4_GHRSST-SSTfnd-MUR25-GLOB-v02.0-fv04.2.nc'])
kwargs:    {}
Exception: 'AttributeError("\'NoneType\' object has no attribute \'open\'")'

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
Cell In[2], line 36
     34 sstmean_1file_parallel = delayed(sstmean_1file)
     35 tasks = [sstmean_1file_parallel(gi) for gi in granule_info[:2]]
---> 36 results = da.compute(*tasks) 
     37 print(results)

File /opt/coiled/env/lib/python3.11/site-packages/dask/base.py:628, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    625     postcomputes.append(x.__dask_postcompute__())
    627 with shorten_traceback():
--> 628     results = schedule(dsk, keys, **kwargs)
    630 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

Cell In[2], line 20, in sstmean_1file()
     18 def sstmean_1file(gran_info_single): # Takes an earthaccess.results.DataGranule object.
     19     #earthaccess.login() # intentionally commented out
---> 20     fileobj = earthaccess.open([gran_info_single])[0]
     21     data = xr.open_dataset(fileobj)
     22     return data['analysed_sst'].mean().item()

File /opt/coiled/env/lib/python3.11/site-packages/earthaccess/api.py:212, in open()
    200 """Returns a list of fsspec file-like objects that can be used to access files
    201 hosted on S3 or HTTPS by third party libraries like xarray.
    202 
   (...)
    209     a list of s3fs "file pointers" to s3 files.
    210 """
    211 provider = _normalize_location(provider)
--> 212 results = earthaccess.__store__.open(granules=granules, provider=provider)
    213 return results

AttributeError: 'NoneType' object has no attribute 'open'

A couple follow up points

  1. If I instead have sstmean_1file() take a file-like object (e.g. the output from earthaccess.open()) rather than a earthaccess.results.DataGranule object, then the dask delayed code works even on an EC2 instance. In fact I posted a tutorial on our PO.DAAC Cookbook with does this: https://podaac.github.io/tutorials/notebooks/Advanced_cloud/basic_dask.html
  2. However, I think the function as I've written it here should also work. I imagine there are cases where it is better to have the function take the granule info rather than the file-like object.

Thanks all for the fantastic package, let me know what you think.

Dean

DeanHenze avatar Mar 21 '24 21:03 DeanHenze

It looks as if you are trying to calculate a time series of global SST; for each time step calculate the mean analysed_sst over lat and lon.

You shouldn't need to use dask explicitly for this because xarray uses dask for these tasks. You may be trying to do something different. If so, I apologize for suggesting "I wouldn't start from here" instead of answering your question.

That said, a more complicated workflow might require dask.delayed.

import earthaccess
import xarray as xr
%%time
earthaccess.login()
granule_info = earthaccess.search_data(short_name="MUR25-JPL-L4-GLOB-v04.2", count=10)

files = earthaccess.open(granule_info)
data = xr.open_mfdataset(files)
mean_sst = data.analysed_sst.mean(dim=['lat','lon'])

mean_sst.plot()
Granules found: 7869
Opening 10 granules, approx size: 0.02 GB
using endpoint: https://archive.podaac.earthdata.nasa.gov/s3credentials
QUEUEING TASKS | : 100%
10/10 [00:00<00:00, 463.19it/s]
PROCESSING TASKS | : 100%
10/10 [00:00<00:00, 2.78it/s]
COLLECTING RESULTS | : 100%
10/10 [00:00<00:00, 1290.28it/s]
CPU times: user 1.31 s, sys: 60.5 ms, total: 1.37 s
Wall time: 6.03 s

The search, access, lazy-load and computation (plotting initiates a compute on the task graph) take 6 s.

mean_sst.values
array([287.01715, 287.01108, 287.0128 , 287.01627, 287.0176 , 287.02246,
       287.0267 , 287.0374 , 287.04288, 287.03943], dtype=float32)

The MUR files are loaded lazily and held as dask.arrays

<xarray.Dataset>
Dimensions:           (time: 10, lat: 720, lon: 1440)
Coordinates:
  * time              (time) datetime64[ns] 2002-09-01T09:00:00 ... 2002-09-1...
  * lat               (lat) float32 -89.88 -89.62 -89.38 ... 89.38 89.62 89.88
  * lon               (lon) float32 -179.9 -179.6 -179.4 ... 179.4 179.6 179.9
Data variables:
    analysed_sst      (time, lat, lon) float32 dask.array<chunksize=(1, 720, 1440), meta=np.ndarray>
    analysis_error    (time, lat, lon) float32 dask.array<chunksize=(1, 720, 1440), meta=np.ndarray>
    mask              (time, lat, lon) float32 dask.array<chunksize=(1, 720, 1440), meta=np.ndarray>
    sea_ice_fraction  (time, lat, lon) float32 dask.array<chunksize=(1, 720, 1440), meta=np.ndarray>
    sst_anomaly       (time, lat, lon) float32 dask.array<chunksize=(1, 720, 1440), meta=np.ndarray>
Attributes: (12/54)
    Conventions:                CF-1.7, ACDD-1.3
    title:                      Daily 0.25-degree MUR SST, Final product
    summary:                    A low-resolution version of the MUR SST analy...
    keywords:                   Oceans > Ocean Temperature > Sea Surface Temp...
    keywords_vocabulary:        NASA Global Change Master Directory (GCMD) Sc...
    standard_name_vocabulary:   NetCDF Climate and Forecast (CF) Metadata Con...
    ...                         ...
    publisher_name:             GHRSST Project Office
    publisher_url:              https://www.ghrsst.org/
    publisher_email:            [email protected]
    file_quality_level:         3
    metadata_link:              http://podaac.jpl.nasa.gov/ws/metadata/datase...
    acknowledgment:             Please acknowledge the use of these data with...

andypbarrett avatar Mar 22 '24 01:03 andypbarrett

Some experimetation with the workflow I described above reveals that this is relatively efficient for 100 to 1000 granules (6 s to 3'ish minutes) but runs into memory issues for larger numbers of files. For the ~7000 granules returned by the search on a 15 GB instance I crash the kernel.

andypbarrett avatar Mar 23 '24 00:03 andypbarrett

Hi Dean, thanks for opening this issue!

This is happening because local instances are not shared automatically in Dask distributed computations. When we distribute the function to the workers the authenticated earthaccess instance is not copied over so we need a way to also send either the instance or credentials to the workers.

I wrote a small notebook to illustrate how to forward the auth variables https://gist.github.com/betolink/9a96c8cb283d6f37f3c5ebe3c24c5b70

I think in the near future this won't be necessary and earthaccess should deal with this case as well. Let us know if this worked for you!

betolink avatar Mar 23 '24 19:03 betolink

Some experimetation with the workflow I described above reveals that this is relatively efficient for 100 to 1000 granules (6 s to 3'ish minutes) but runs into memory issues for larger numbers of files. For the ~7000 granules returned by the search on a 15 GB instance I crash the kernel.

Hi Andy thanks for getting back to me! I understand your response, if a time series of SST mean were the end goal then using Xarray in the way you show would be the way to go. This is a toy example, and I have more complicated cases where dask delayed is necessary.

Hi Dean, thanks for opening this issue!

This is happening because local instances are not shared automatically in Dask distributed computations. When we distribute the function to the workers the authenticated earthaccess instance is not copied over so we need a way to also send either the instance or credentials to the workers.

I wrote a small notebook to illustrate how to forward the auth variables https://gist.github.com/betolink/9a96c8cb283d6f37f3c5ebe3c24c5b70

I think in the near future this won't be necessary and earthaccess should deal with this case as well. Let us know if this worked for you!

Hi Luis, very cool I'll give this a try! Thank you.

DeanHenze avatar Mar 25 '24 18:03 DeanHenze

Being my first Github issue, not sure how this works. Should I close the issue for now? Sounds like you already have plans on getting to it.

DeanHenze avatar Mar 25 '24 18:03 DeanHenze

I think we should keep it open until one of the proposed solutions works for this use case.

betolink avatar Mar 25 '24 18:03 betolink

I don't think this approach is scaling. I ran @betolink gist code for 1000 granules and got a KeyError. I suspect this is because there are 1000 hits on CMR. See below.

I also tried earthaccess.open on the full granule list. And then loop through the fileobjects. This runs into memory issues for granules > 4000 on a 15 GB instance. This also happens for the approach @betolink describes above.

It is not clear to me why this is happening yet. But I suspect memory is not getting released once the aggregation (averaging) step is done. I even used a context manager on xarray.open_dataset but the same memory issue occurs.

QUEUEING TASKS | : 100%|██████████| 1/1 [00:00<00:00, 1889.33it/s]
PROCESSING TASKS | : 100%|██████████| 1/1 [00:00<00:00,  7.74it/s]
COLLECTING RESULTS | : 100%|██████████| 1/1 [00:00<00:00, 28149.69it/s]
2024-03-25 18:28:50,840 - distributed.worker - WARNING - Compute Failed
Key:       sstmean_1file-9ecf331c-da16-4d4c-98f1-0e372c8153a1
Function:  sstmean_1file
args:      (Collection: {'Version': '4.2', 'ShortName': 'MUR25-JPL-L4-GLOB-v04.2'}
Spatial coverage: {'HorizontalSpatialDomain': {'Geometry': {'BoundingRectangles': [{'WestBoundingCoordinate': -180, 'SouthBoundingCoordinate': -90, 'EastBoundingCoordinate': 180, 'NorthBoundingCoordinate': 90}]}}}
Temporal coverage: {'RangeDateTime': {'EndingDateTime': '2003-01-28T21:00:00.000Z', 'BeginningDateTime': '2003-01-27T21:00:00.000Z'}}
Size(MB): 1.8240442276000977
Data: ['https://archive.podaac.earthdata.nasa.gov/podaac-ops-cumulus-protected/MUR25-JPL-L4-GLOB-v04.2/20030128090000-JPL-L4_GHRSST-SSTfnd-MUR25-GLOB-v02.0-fv04.2.nc'])
kwargs:    {}
Exception: "KeyError('accessKeyId')"

---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
Cell In[7], line 1
----> 1 results = da.compute(*tasks)

File /srv/conda/envs/notebook/lib/python3.10/site-packages/dask/base.py:665, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    662     postcomputes.append(x.__dask_postcompute__())
    664 with shorten_traceback():
--> 665     results = schedule(dsk, keys, **kwargs)
    667 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

Cell In[5], line 3, in sstmean_1file()
      1 def sstmean_1file(gran_info_single):
      2     earthaccess.login()
----> 3     fileobj = earthaccess.open([gran_info_single])[0]
      4     data = xr.open_dataset(fileobj)
      5     return data['analysed_sst'].mean().item()

File /srv/conda/envs/notebook/lib/python3.10/site-packages/earthaccess/api.py:217, in open()
    207 """Returns a list of fsspec file-like objects that can be used to access files
    208 hosted on S3 or HTTPS by third party libraries like xarray.
    209 
   (...)
    214     a list of s3fs "file pointers" to s3 files.
    215 """
    216 provider = _normalize_location(provider)
--> 217 results = earthaccess.__store__.open(granules=granules, provider=provider)
    218 return results

File /srv/conda/envs/notebook/lib/python3.10/site-packages/earthaccess/store.py:308, in open()
    299 """Returns a list of fsspec file-like objects that can be used to access files
    300 hosted on S3 or HTTPS by third party libraries like xarray.
    301 
   (...)
    305     a list of s3fs "file pointers" to s3 files.
    306 """
    307 if len(granules):
--> 308     return self._open(granules, provider)
    309 return []

File /srv/conda/envs/notebook/lib/python3.10/site-packages/multimethod/__init__.py:315, in __call__()
    313 func = self[tuple(func(arg) for func, arg in zip(self.type_checkers, args))]
    314 try:
--> 315     return func(*args, **kwargs)
    316 except TypeError as ex:
    317     raise DispatchError(f"Function {func.__code__}") from ex

File /srv/conda/envs/notebook/lib/python3.10/site-packages/earthaccess/store.py:351, in _open_granules()
    349 if endpoint is not None:
    350     print(f"using endpoint: {endpoint}")
--> 351     s3_fs = self.get_s3fs_session(endpoint=endpoint)
    352 else:
    353     print(f"using provider: {provider}")

File /srv/conda/envs/notebook/lib/python3.10/site-packages/earthaccess/store.py:259, in get_s3fs_session()
    255     # Include new credentials in the cache
    256     self._s3_credentials[location] = now, creds
    258 return s3fs.S3FileSystem(
--> 259     key=creds["accessKeyId"],
    260     secret=creds["secretAccessKey"],
    261     token=creds["sessionToken"],
    262 )

KeyError: 'accessKeyId'

andypbarrett avatar Mar 25 '24 18:03 andypbarrett

I think we should keep it open until one of the proposed solutions works for this use case.

I tried out your solution (auth_env() function) for both the toy case and a more complicated case, and it worked. However I haven't tried scaling yet. I will see if I run into the same issues as Andy.

DeanHenze avatar Mar 25 '24 19:03 DeanHenze

Just keep in mind that for Dask we should try to match the worker number to the available CPUs. The total memory of the VM will be divided equally for each worker, so it's good to have the "workers memory" panel open to monitor memory utilization.

betolink avatar Mar 25 '24 20:03 betolink

I am running into errors when scaling, but I think those are connected with worker memory release issues, rather than the one in this ticket.

DeanHenze avatar Mar 26 '24 16:03 DeanHenze

Nice! (the part of the original issue being resolved) but this should be documented. I wonder if we can have a "earthaccess for distributed workflows" tutorial. FWIW, I managed to run through the whole MUR collection in under 10 minutes using a small 14GB RAM instance! the trick was to use the implicit login() and trigger the memory collection manually.

import gc

def sstmean_1file(gran_info_single):
    fileobj = earthaccess.open([gran_info_single])[0]
    data = xr.open_dataset(fileobj)
    mean = data['analysed_sst'].mean().item()
    del fileobj, data
    gc.collect()
    return mean

betolink avatar Mar 26 '24 18:03 betolink

FWIW, I managed to run through the whole MUR collection in under 10 minutes using a small 14GB RAM instance! the trick was to use the implicit login() and trigger the memory collection manually.

Woohoo! I was also able to process the entire record using the memory collection, took a little longer at 15 minutes but still sounds great to me.

Nice! (the part of the original issue being resolved) but this should be documented. I wonder if we can have a "earthaccess for distributed workflows" tutorial.

I think I have that somewhat started with the Cookbook tutorial (of course you probably want your own one). I will be sure to update it with some notes about applying the auth_env() function and manual memory collection.

One thing when doing a full scale analysis is the output from earthaccess becomes very long. Maybe there could be an option to suppress the output?

DeanHenze avatar Mar 27 '24 18:03 DeanHenze

Maybe there could be an option to suppress the output?

Certainly, that could be implemented (probably upstream to pqdm)

betolink avatar Mar 27 '24 18:03 betolink

I ran into the same issue with output. Adding the cell magic %%capture can eliminate that output.

andypbarrett avatar Mar 27 '24 19:03 andypbarrett

For some reason %%capture didn't work with Dask output.

betolink avatar Mar 27 '24 19:03 betolink

Another update, garbage collection does not seem to be cutting it for my more complicated case. I have the Dask task graph up and watch the memory per work slowly accumulate as they churn through the files, until finally crashing. Wondering if for the toy case above, the memory per worker was also accumulating, but just slowly enough that the analysis could finish before it became an issue. I'll rerun the toy example above and check out the task graph while running.

DeanHenze avatar Mar 27 '24 19:03 DeanHenze

If you share your notebook maybe we can take a look at what's happening.

betolink avatar Mar 27 '24 19:03 betolink

If you share your notebook maybe we can take a look at what's happening.

Here is the more complicated case (I think that's what you are referring to?) https://github.com/DeanHenze/podaac-dpub/blob/main/cowvr_animation.ipynb. Although you won't be able to run the notebook because it uses a data set that is currently restricted.

DeanHenze avatar Mar 28 '24 19:03 DeanHenze

This is my first GitHub issue so please feel free to let me know how I can improve this post.

While I have little to contribute (aside from re-titling this issue), I do have some feedback for you. This is one of the best first GitHub issues I've seen! :heart: Welcome to the community and thanks so much for your contribution!

mfisher87 avatar Mar 31 '24 00:03 mfisher87

I ran another test with the toy example to see if garbage collection and memory cleanup were working, and I am not sure they are. I think it's just that the files and computations are small enough that it doesn't cause any issues. I ran sstmean_1file () on a local cluster (with the del fileobj, data and gc.collect() lines) and monitored the memory per worker. Here is the memory of one worker both after ~2600 files processed, and ~7700 files processed, you can see the majority of memory used is "unmanaged memory", and it increases:

with_gc_1 with_gc_2

I then repeated this run with a new kernel and cluster but removed the del fileobj, data and gc.collect() lines, and it looks like the total and unmanaged memory are the same (in fact it's a little lower for the ~7700 files):

without_gc_1 without_gc_2

So I'm not sure those lines are working, and I imagine that for my more complex case where the files are much larger, the unmanaged memory gets too high.

DeanHenze avatar Apr 02 '24 22:04 DeanHenze

I'll try to take a look this week, we are of course entering dask/xarray territory, I think having this workflow figured out will benefit all earthaccess users. FWIW, unmanaged memory is not a solved issue with Dask and the manual garbage collection is for what Python can see. @DeanHenze

betolink avatar Apr 11 '24 04:04 betolink

@betolink , totally understood and agreed, this seems far out of earthaccess's field of view, and not expecting any solutions from you! Just figured I'd follow up with you since you posted the bit about garbage collection earlier in the conversation.

DeanHenze avatar Apr 12 '24 18:04 DeanHenze