pangeo-forge-recipes
pangeo-forge-recipes copied to clipboard
Is it possible to specify more than one MergeDim to a recipe file pattern?
Hi,
I'm looking at creating a recipe for CMEMS ASCAT wind data (following the ftp approach in https://github.com/pangeo-forge/staged-recipes/pull/163). ASCAT products are available for all combinations of Metop A/B (REP) and Metop A/B/C (NRT) satellites and ASCending, DEScending passes. I've started with the NRT products, where I was hoping to use one MergeDim
each for satellite and pass options, something like:
def make_url(satellite, satellite_pass, time):
year = time.strftime("%Y")
month = time.strftime("%m")
day = time.strftime("%d")
url = f"ftp://nrt.cmems-du.eu/WIND_GLO_WIND_L3_NRT_OBSERVATIONS_012_002/KNMI-GLO-WIND_L3-OBS_METOP-{satellite}_ASCAT_12_{satellite_pass}_V2/{year}/{month}/GLO-WIND_L3-OBS_METOP-{satellite}_ASCAT_12_{satellite_pass}_{year}{month}{day}.nc"
return url
satellite_merge_dim = MergeDim("satellite", ["A", "B", "C"])
satellite_pass_merge_dim = MergeDim("satellite_pass", ["ASC", "DES"])
pattern = FilePattern(
make_url,
satellite_merge_dim,
satellite_pass_merge_dim,
ConcatDim(name="time", keys=dates),
)
However, when creating the recipe, I get the following error in the sandbox:
---------------------------------------------------------------------------
NotImplementedError Traceback (most recent call last)
Input In [6], in <cell line: 140>()
131 pattern = FilePattern(
132 make_url,
133 satellite_merge_dim,
134 satellite_pass_merge_dim,
135 ConcatDim(name="time", keys=dates),
136 )
138 target_chunks = {"time": 1500, "latitude": -1, "longitude": -1}
--> 140 recipe = XarrayZarrRecipe(
141 file_pattern=pattern,
142 target_chunks=target_chunks,
143 process_input=ics_measurement_time,
144 )
File <string>:21, in __init__(self, file_pattern, storage_config, inputs_per_chunk, target_chunks, cache_inputs, copy_input_to_local_file, consolidate_zarr, consolidate_dimension_coordinates, xarray_open_kwargs, xarray_concat_kwargs, delete_input_encoding, process_input, process_chunk, lock_timeout, subset_inputs, open_input_with_kerchunk)
File /srv/conda/envs/notebook/lib/python3.9/site-packages/pangeo_forge_recipes/recipes/xarray_zarr.py:783, in XarrayZarrRecipe.__post_init__(self)
782 def __post_init__(self):
--> 783 self._validate_file_pattern()
785 # from here on we know there is at most one merge dim and one concat dim
786 self.concat_dim = self.file_pattern.concat_dims[0]
File /srv/conda/envs/notebook/lib/python3.9/site-packages/pangeo_forge_recipes/recipes/xarray_zarr.py:827, in XarrayZarrRecipe._validate_file_pattern(self)
825 def _validate_file_pattern(self):
826 if len(self.file_pattern.merge_dims) > 1:
--> 827 raise NotImplementedError("This Recipe class can't handle more than one merge dim.")
828 if len(self.file_pattern.concat_dims) > 1:
829 raise NotImplementedError("This Recipe class can't handle more than one concat dim.")
NotImplementedError: This Recipe class can't handle more than one merge dim.
I wanted to check whether the restriction to a single MergeDim
is fixed for a particular reason, or can be relaxed to accommodate this scenario?
Thanks, Derek
Thanks for the useful question Derek!
Once https://github.com/pangeo-forge/pangeo-forge-recipes/issues/376 is complete, it will be possible to specify arbitrary numbers of concat and merge dims.
In the meantime, the suggested workaround is to create separate recipes for the second merge dim.
Thanks Ryan, I'll do that and also watch out for any updates to https://github.com/pangeo-forge/pangeo-forge-recipes/issues/376
@derekocallaghan were you ever able to confirm that this does work in 0.10.0
? If so, perhaps it will make a good tutorial.
Hey @cisaacstern I tried 0.10.0
for multiple Merge Dimension. It breaks the pipeline.
@dataclass
class DetermineSchema(beam.PTransform):
"""Combine many Datasets into a single schema along multiple dimensions.
This is a reduction that produces a singleton PCollection.
:param combine_dims: The dimensions to combine
"""
combine_dims: List[Dimension]
def expand(self, pcoll: beam.PCollection) -> beam.PCollection:
schemas = pcoll | beam.Map(_add_keys(dataset_to_schema))
cdims = self.combine_dims.copy()
while len(cdims) > 0:
last_dim = cdims.pop()
if len(cdims) == 0:
# at this point, we should have a 1D index as our key
schemas = schemas | beam.CombineGlobally(CombineXarraySchemas(last_dim))
else:
schemas = (
schemas
| _NestDim(last_dim)
| beam.CombinePerKey(CombineXarraySchemas(last_dim))
)
return schemas
In above snipped if we run this with 3 combine dimensions (1 Concat
and 2 Merge
In My Case) If fails with below error.
RuntimeError: A transform with label "DetermineSchema/_NestDim" already exists in the pipeline. To apply a transform with a specified label write pvalue | "label" >> transform
@DarshanSP19, thanks for this very helpful report! I think the following might fix this, as the reported error appears to be a case in which Beam doesn't know how to generate a unique name for the unlabeled _NestDim
stage:
if len(cdims) == 0:
# at this point, we should have a 1D index as our key
schemas = schemas | beam.CombineGlobally(CombineXarraySchemas(last_dim))
else:
schemas = (
schemas
- | _NestDim(last_dim)
- | beam.CombinePerKey(CombineXarraySchemas(last_dim))
+ | f"Nest {last_dim.name}" >> _NestDim(last_dim)
+ | f"Combine {last_dim.name}" >> beam.CombinePerKey(CombineXarraySchemas(last_dim))
)
Could I entice you to try that fix, and if it works, submit it as a PR? 🙏
Edit: Added a label to the combine stage as well, as my guess is it will have the same issue once the _NestDim
labeling is resolved. Note also that another problem may surface once we're past this error, but this looks like the right place to start.
@derekocallaghan were you ever able to confirm that this does work in
0.10.0
? If so, perhaps it will make a good tutorial.
Sorry @cisaacstern, I didn't notice your question until today. It's been a while since a looked at the original recipe which required multiple MergeDims, iirc I think I had a workaround that was probably a better approach. I wanted to port this recipe to Beam, so if it's still relevant I'll try your above suggestion if a PR hasn't been created in the meantime.
@DarshanSP19, thanks for this very helpful report! I think the following might fix this, as the reported error appears to be a case in which Beam doesn't know how to generate a unique name for the unlabeled
_NestDim
stage:if len(cdims) == 0: # at this point, we should have a 1D index as our key schemas = schemas | beam.CombineGlobally(CombineXarraySchemas(last_dim)) else: schemas = ( schemas - | _NestDim(last_dim) - | beam.CombinePerKey(CombineXarraySchemas(last_dim)) + | f"Nest {last_dim.name}" >> _NestDim(last_dim) + | f"Combine {last_dim.name}" >> beam.CombinePerKey(CombineXarraySchemas(last_dim)) )
Could I entice you to try that fix, and if it works, submit it as a PR? 🙏
Edit: Added a label to the combine stage as well, as my guess is it will have the same issue once the
_NestDim
labeling is resolved. Note also that another problem may surface once we're past this error, but this looks like the right place to start.
Hey @cisaacstern It worked for me. Happy to do a PR.