stackstac.stack to support one time coordinate per unique datetime?
Please see below an example running on the Planetary Computer using Esri 10m Land Cover data, where each STAC item is derived from a mosaic of many images. The output is a 4D cube with the time dimension is 4 and the time coordinates are just 2020-06-01T00:00:00 repeated.
The documentation clearly stated that ``time`` will be equal in length to the number of items you pass in, and indexed by STAC Item datetime. But in a more natural way, it's expected that the dataarray should have one time coordinate per unique datetime in the STAC items. Would stackstac.stack support this feature?
import numpy as np
import planetary_computer as pc
import pystac_client
import stackstac
catalog = pystac_client.Client.open(
"https://planetarycomputer.microsoft.com/api/stac/v1/"
)
point = {"type": "Point", "coordinates": [-97.807733, 33.2133019]}
io_lulc_search = catalog.search(collections=["io-lulc"], intersects=point)
io_lulc_items = [pc.sign(item).to_dict() for item in io_lulc_search.get_items()]
data = stackstac.stack(io_lulc_items, assets=["data"], epsg=3857)
data.shape, np.unique(data.time)
Output: ((4, 1, 185098, 134598), array(['2020-06-01T00:00:00.000000000'], dtype='datetime64[ns]'))
As a bit of context, the STAC items in this global IO-LULC dataset all have the same timestamp. AFAICT, if you wanted to load 100 of these items with stackstac, memory usage would be ~100x larger than necessary, since the array would be (100, 1, width, height), rather than (1, 1, width, height).
What about stackstac.mosaic(data)? It would certainly give you the output you're looking for. But it would probably use more memory than necessary, since you'd briefly create 100 layers of unnecessary NaNs before flattening them (as Tom said above). However, these NaNs would maybe not actually take 100x the memory, since we use this broadcast trick optimization for the out-of-bounds chunks:
https://github.com/gjoseph92/stackstac/blob/2c799e3f0075942cbb76a99f96517d4fe15bc98a/stackstac/to_dask.py#L176-L178
I haven't tested the memory usage; I'd be curious what actually happens.
it's expected that the dataarray should have one time coordinate per unique datetime in the STAC items. Would stackstac.stack support this feature?
We could think about it, but it might be tricky to implement during the stacking itself. If a chunk of the array could be sourced from multiple GDAL datasets, that would require rethinking a lot of the dask graph generation logic.
I might prefer to implement this through optimized mosaic logic instead. The thing we want to skip is generating that all-NaN array for out-of-bounds chunks when we know it will immediately get dropped by a mosaic. There's already logic that avoids even opening an asset for spatial chunks that don't overlap with that asset:
https://github.com/gjoseph92/stackstac/blob/2c799e3f0075942cbb76a99f96517d4fe15bc98a/stackstac/prepare.py#L233-L238
If we did #13, I have a feeling mosaic could look at each spatial chunk, pluck out only the assets that overlap with that spatial chunk, and mosaic only those together.
However, I'd want to test the current mosaic performance, because between the broadcast trick and short-circuiting the dataset open, performance might already be pretty good.
So if mosaic performance is good (or can be good), I might be in favor of a flatten_dates kwarg to stack, or even a separate function, which is just a convenience for
arr.groupby("time").apply(stackstac.mosaic) or something like that.
So if mosaic performance is good (or can be good), I might be in favor of a flatten_dates kwarg to stack, or even a separate function, which is just a convenience for arr.groupby("time").apply(stackstac.mosaic) or something like that.
👍 either of those sound perfect to me.
@TomAugspurger or @thuydotm, any interest in trying out mosaic on these cases and seeing how performance is right now?
I think we can conclude that stackstac(...).pipe(stackstac.mosaic) works well enough. I was able to build a DataArray with this property (all the same datetime) whose size in memory would be 860.26 GiB. I ran it through stackstac.mosaic, which cut things down to 215.06 GiB and then persisted the result on a cluster with ~256GB of memory. Here's a performance report.
https://gistcdn.githack.com/TomAugspurger/587bd0e00860b52be9c3830e3bbca35c/raw/17323077f808cfae02ef7dad79d1648cd8591499/stackstac-mosaic.html
There is a decent amount of communication, but I'm not sure how concerned to be about that... IIUC, mosaic might need to transfer chunks from one worker to another. I do have a theoretical (unconfirmed) concern that the broadcasting trick with np.nan doesn't play well distributed's worker assignment. When distributed is in decide_worker, do we know what it thinks the size of the all-NaN chunk is? Hopefully it's the reduced size, and not the size of the full array.
Here's the code I ran
import stackstac
from pystac_client import Client
import planetary_computer as pc
bounds = (120.9, 14.8, 120.91, 14.81)
catalog = Client.open("https://planetarycomputer.microsoft.com/api/stac/v1")
search = catalog.search(collections=["io-lulc"], bbox=bounds)
# Check how many items were returned
items = list(search.get_items())
print(f"Returned {len(items)} Items")
signed_items = [pc.sign(item).to_dict() for item in items]
ds = stackstac.stack(signed_items, epsg=32650, chunksize=4096)
# cluster
from dask_gateway import GatewayCluster
cluster = GatewayCluster()
cluster.scale(28)
client = cluster.get_client()
ds2 = stackstac.mosaic(ds).persist()
Nice to not see any spilling to disk in that performance report!
IIUC,
mosaicmight need to transfer chunks from one worker to another
It definitely would, just like any other reduction (mean, sum, etc.). I haven't looked at the graphs it makes yet, but they may be a little weird, since it's not a tree-reduce graph, but conceptually more like functools.reduce (since a mosaic is order-dependent, you have to have the mosaic of the previous layers in order to do the next one). There might be a way to improve the parallelism by redesigning it to be tree-like, but preserving order. This isn't memory-related, just another optimization idea. (Not that it looks like that task stream needs any more parallelism?)
I do have a theoretical (unconfirmed) concern that the broadcasting trick with
np.nandoesn't play well distributed's worker assignment
Seems like it does!
In [1]: import numpy as np
In [2]: from dask.sizeof import sizeof
In [3]: trick = np.broadcast_to(np.nan, (2048, 2048))
In [4]: sizeof(trick)
Out[4]: 8
In [5]: full = np.full_like(trick, np.nan)
In [6]: sizeof(full)
Out[6]: 33554432
So hopefully decide_worker would schedule the where on the worker that already had one of the real input layers, instead of the one where the all-NaN chunk lived.
One interesting thing to look at would be how the initial chunks of the dataset get prioritized by dask.order. With https://github.com/dask/distributed/pull/4967, if we can get chunks that are spatially next to each other to also have similar priorities, they'll mostly get scheduled to the same worker, which will reduce the transfer necessary for the mosaic. This could actually be an argument for an optimization that skips the all-NaN chunks when building the mosaic graph, since I think dask.order would end up better prioritizing that graph since those irrelevant tasks won't take up space.
Thanks! I had been looking at that landcover dataset a few weeks ago and wondering about an approach.
Following up; is there anything we want to do here?
Do we want to add any convenience functionality:
So if mosaic performance is good (or can be good), I might be in favor of a
flatten_dateskwarg tostack, or even a separate function, which is just a convenience forarr.groupby("time").apply(stackstac.mosaic)or something like that.
Or maybe some documentation about this?
Or just close?
Convenience functions are good I think Gabe.
+1 for a flatten_dates arg to stackstac.stack, though it will be important to make sure the the raster nodata and stackstac.stack fill_value parameters are being used correctly.
I'm leaning towards that too. I think that with https://github.com/gjoseph92/stackstac/pull/116, some of the hard part is already done. The process would basically be:
- Calculate the grouping based on STAC metadata here. You want to end up with something that could be a
chunksargument for the time dimension (like(3, 1, 1, 2, 5, 4, ...)where each number is the number of subsequent items that belong to the same group, like a run-length encoding) - Pass that into
items_to_daskasflatten_time= - Use it as the normalized time chunks. There will need to be some validation that you can't specify chunking for the time dimension if you also specify
flatten_dates - If
flatten_timeis passed, before thefetch_raster_windowblockwise part, replacechunkswith a copy of the tuple where the first element is(1,) * len(chunks[0])(assumingchunks[0] == flatten_timecurrently). This will track the fact that unlike the asset table, where each chunk contains N entries along the time dimension, every chunk in the final array will have 1 entry along time (because they'll all be flattened) - Pass a
flatten=bool(flatten_time)argument into thefetch_raster_windowblockwise generation. Also make sure to include it in the tokenization! - Inside
fetch_raster_windowwill be the biggest changes. It may make sense to have two different functions, or at least two different helper functions that are called within the loop. You'll need to accept theflatten: boolargument, and handle it. Rather than allocating the full-size array, it'll obviously only be 1-length. And when looping for assets, you'll keep mosaicing the new result with the previous, and obviously short-circuiting and not loading any more assets as soon as every pixel is filled in.