stackstac icon indicating copy to clipboard operation
stackstac copied to clipboard

stackstac.stack to support one time coordinate per unique datetime?

Open thuydotm opened this issue 4 years ago • 11 comments

Please see below an example running on the Planetary Computer using Esri 10m Land Cover data, where each STAC item is derived from a mosaic of many images. The output is a 4D cube with the time dimension is 4 and the time coordinates are just 2020-06-01T00:00:00 repeated.

The documentation clearly stated that ``time`` will be equal in length to the number of items you pass in, and indexed by STAC Item datetime. But in a more natural way, it's expected that the dataarray should have one time coordinate per unique datetime in the STAC items. Would stackstac.stack support this feature?

import numpy as np
import planetary_computer as pc
import pystac_client
import stackstac

catalog = pystac_client.Client.open(
    "https://planetarycomputer.microsoft.com/api/stac/v1/"
)
point = {"type": "Point", "coordinates": [-97.807733, 33.2133019]}

io_lulc_search = catalog.search(collections=["io-lulc"], intersects=point)
io_lulc_items = [pc.sign(item).to_dict() for item in io_lulc_search.get_items()]
data = stackstac.stack(io_lulc_items, assets=["data"], epsg=3857)

data.shape, np.unique(data.time)

Output: ((4, 1, 185098, 134598), array(['2020-06-01T00:00:00.000000000'], dtype='datetime64[ns]'))

thuydotm avatar Aug 03 '21 10:08 thuydotm

As a bit of context, the STAC items in this global IO-LULC dataset all have the same timestamp. AFAICT, if you wanted to load 100 of these items with stackstac, memory usage would be ~100x larger than necessary, since the array would be (100, 1, width, height), rather than (1, 1, width, height).

TomAugspurger avatar Aug 03 '21 11:08 TomAugspurger

What about stackstac.mosaic(data)? It would certainly give you the output you're looking for. But it would probably use more memory than necessary, since you'd briefly create 100 layers of unnecessary NaNs before flattening them (as Tom said above). However, these NaNs would maybe not actually take 100x the memory, since we use this broadcast trick optimization for the out-of-bounds chunks: https://github.com/gjoseph92/stackstac/blob/2c799e3f0075942cbb76a99f96517d4fe15bc98a/stackstac/to_dask.py#L176-L178

I haven't tested the memory usage; I'd be curious what actually happens.

it's expected that the dataarray should have one time coordinate per unique datetime in the STAC items. Would stackstac.stack support this feature?

We could think about it, but it might be tricky to implement during the stacking itself. If a chunk of the array could be sourced from multiple GDAL datasets, that would require rethinking a lot of the dask graph generation logic.

I might prefer to implement this through optimized mosaic logic instead. The thing we want to skip is generating that all-NaN array for out-of-bounds chunks when we know it will immediately get dropped by a mosaic. There's already logic that avoids even opening an asset for spatial chunks that don't overlap with that asset: https://github.com/gjoseph92/stackstac/blob/2c799e3f0075942cbb76a99f96517d4fe15bc98a/stackstac/prepare.py#L233-L238 If we did #13, I have a feeling mosaic could look at each spatial chunk, pluck out only the assets that overlap with that spatial chunk, and mosaic only those together. However, I'd want to test the current mosaic performance, because between the broadcast trick and short-circuiting the dataset open, performance might already be pretty good.


So if mosaic performance is good (or can be good), I might be in favor of a flatten_dates kwarg to stack, or even a separate function, which is just a convenience for arr.groupby("time").apply(stackstac.mosaic) or something like that.

gjoseph92 avatar Aug 09 '21 18:08 gjoseph92

So if mosaic performance is good (or can be good), I might be in favor of a flatten_dates kwarg to stack, or even a separate function, which is just a convenience for arr.groupby("time").apply(stackstac.mosaic) or something like that.

👍 either of those sound perfect to me.

TomAugspurger avatar Aug 13 '21 18:08 TomAugspurger

@TomAugspurger or @thuydotm, any interest in trying out mosaic on these cases and seeing how performance is right now?

gjoseph92 avatar Aug 13 '21 18:08 gjoseph92

I think we can conclude that stackstac(...).pipe(stackstac.mosaic) works well enough. I was able to build a DataArray with this property (all the same datetime) whose size in memory would be 860.26 GiB. I ran it through stackstac.mosaic, which cut things down to 215.06 GiB and then persisted the result on a cluster with ~256GB of memory. Here's a performance report.

https://gistcdn.githack.com/TomAugspurger/587bd0e00860b52be9c3830e3bbca35c/raw/17323077f808cfae02ef7dad79d1648cd8591499/stackstac-mosaic.html

There is a decent amount of communication, but I'm not sure how concerned to be about that... IIUC, mosaic might need to transfer chunks from one worker to another. I do have a theoretical (unconfirmed) concern that the broadcasting trick with np.nan doesn't play well distributed's worker assignment. When distributed is in decide_worker, do we know what it thinks the size of the all-NaN chunk is? Hopefully it's the reduced size, and not the size of the full array.

Here's the code I ran

import stackstac
from pystac_client import Client
import planetary_computer as pc

bounds = (120.9, 14.8, 120.91, 14.81)

catalog = Client.open("https://planetarycomputer.microsoft.com/api/stac/v1")

search = catalog.search(collections=["io-lulc"], bbox=bounds)

# Check how many items were returned
items = list(search.get_items())
print(f"Returned {len(items)} Items")

signed_items = [pc.sign(item).to_dict() for item in items]

ds = stackstac.stack(signed_items, epsg=32650, chunksize=4096)

# cluster
from dask_gateway import GatewayCluster

cluster = GatewayCluster()
cluster.scale(28)
client = cluster.get_client()

ds2 = stackstac.mosaic(ds).persist()

TomAugspurger avatar Aug 16 '21 17:08 TomAugspurger

Nice to not see any spilling to disk in that performance report!

IIUC, mosaic might need to transfer chunks from one worker to another

It definitely would, just like any other reduction (mean, sum, etc.). I haven't looked at the graphs it makes yet, but they may be a little weird, since it's not a tree-reduce graph, but conceptually more like functools.reduce (since a mosaic is order-dependent, you have to have the mosaic of the previous layers in order to do the next one). There might be a way to improve the parallelism by redesigning it to be tree-like, but preserving order. This isn't memory-related, just another optimization idea. (Not that it looks like that task stream needs any more parallelism?)

I do have a theoretical (unconfirmed) concern that the broadcasting trick with np.nan doesn't play well distributed's worker assignment

Seems like it does!

In [1]: import numpy as np

In [2]: from dask.sizeof import sizeof

In [3]: trick = np.broadcast_to(np.nan, (2048, 2048))

In [4]: sizeof(trick)
Out[4]: 8

In [5]: full = np.full_like(trick, np.nan)

In [6]: sizeof(full)
Out[6]: 33554432

So hopefully decide_worker would schedule the where on the worker that already had one of the real input layers, instead of the one where the all-NaN chunk lived.

One interesting thing to look at would be how the initial chunks of the dataset get prioritized by dask.order. With https://github.com/dask/distributed/pull/4967, if we can get chunks that are spatially next to each other to also have similar priorities, they'll mostly get scheduled to the same worker, which will reduce the transfer necessary for the mosaic. This could actually be an argument for an optimization that skips the all-NaN chunks when building the mosaic graph, since I think dask.order would end up better prioritizing that graph since those irrelevant tasks won't take up space.

gjoseph92 avatar Aug 16 '21 21:08 gjoseph92

Thanks! I had been looking at that landcover dataset a few weeks ago and wondering about an approach.

RichardScottOZ avatar Aug 16 '21 22:08 RichardScottOZ

Following up; is there anything we want to do here?

Do we want to add any convenience functionality:

So if mosaic performance is good (or can be good), I might be in favor of a flatten_dates kwarg to stack, or even a separate function, which is just a convenience for arr.groupby("time").apply(stackstac.mosaic) or something like that.

Or maybe some documentation about this?

Or just close?

gjoseph92 avatar Dec 03 '21 18:12 gjoseph92

Convenience functions are good I think Gabe.

RichardScottOZ avatar Dec 04 '21 01:12 RichardScottOZ

+1 for a flatten_dates arg to stackstac.stack, though it will be important to make sure the the raster nodata and stackstac.stack fill_value parameters are being used correctly.

hrodmn avatar Jun 10 '22 18:06 hrodmn

I'm leaning towards that too. I think that with https://github.com/gjoseph92/stackstac/pull/116, some of the hard part is already done. The process would basically be:

  • Calculate the grouping based on STAC metadata here. You want to end up with something that could be a chunks argument for the time dimension (like (3, 1, 1, 2, 5, 4, ...) where each number is the number of subsequent items that belong to the same group, like a run-length encoding)
  • Pass that into items_to_dask as flatten_time=
  • Use it as the normalized time chunks. There will need to be some validation that you can't specify chunking for the time dimension if you also specify flatten_dates
  • If flatten_time is passed, before the fetch_raster_window blockwise part, replace chunks with a copy of the tuple where the first element is (1,) * len(chunks[0]) (assuming chunks[0] == flatten_time currently). This will track the fact that unlike the asset table, where each chunk contains N entries along the time dimension, every chunk in the final array will have 1 entry along time (because they'll all be flattened)
  • Pass a flatten=bool(flatten_time) argument into the fetch_raster_window blockwise generation. Also make sure to include it in the tokenization!
  • Inside fetch_raster_window will be the biggest changes. It may make sense to have two different functions, or at least two different helper functions that are called within the loop. You'll need to accept the flatten: bool argument, and handle it. Rather than allocating the full-size array, it'll obviously only be 1-length. And when looping for assets, you'll keep mosaicing the new result with the previous, and obviously short-circuiting and not loading any more assets as soon as every pixel is filled in.

gjoseph92 avatar Jun 10 '22 19:06 gjoseph92