satpy
satpy copied to clipboard
Race condition while caching for resampling
Describe the bug I think there could be an issue with caching for re-sampling when two similar processes are running in parallel using threading. See further below.
To Reproduce
I have a process running that listens for new NWCSAF/Geo cloud products. It starts processing when either a Cloud Type or a CTTH product is ready, with the code shown below. The code launch a separate threaded processing using the apply_async
method from the multiprocessing Pool
module. As the both the CLoud Type and CTTH products are in practice ready almost at the same time, this code will end up running both the "workers" below in parallel. This is when having:
pool = Pool(processes=6, maxtasksperchild=1)
If I lower the processes
keyword to 1
doing pool = Pool(processes=1, maxtasksperchild=1)
the race conditions issue disappears!
# Your code here
if product == "CT":
LOG.debug("Product is CT")
pool.apply_async(ctype_composite_worker,
(scene,
jobs_dict[
keyname],
publisher_q,
config_options))
elif product == "CTTH":
LOG.debug("Product is CTTH")
pool.apply_async(ctth_composite_worker,
(scene,
jobs_dict[
keyname],
publisher_q,
config_options))
The two workers above are rather similar and essentially make a composite scene (cloud type or ctth) using Satpy's Multiscene blend functionality and publish the result by sending a Posttroll message in the end. In addition the blended scene is written to disk and a quicklook image is generated and saved, and from the netCDF blended result a set of super-observations are created and an ascii file is written to disk after the netCDF result has been published.
Expected behavior So, I would have expected that it was possible to do this processing in parallel without getting into the race condition mentioned.
As I said above, if I force the processing to be sequjential I don't have the same problem anymore.
Actual results Text output of actual results or error messages including full tracebacks if applicable.
[2023-11-13 07:30:40,020 ERROR __main__] Failed in ctth_composite_worker...
Traceback (most recent call last):
File "/san1/opt/mesan-composite-runner/releases/mesan-composite-runner-0.1.15/lib/python3.11/site-packages/satpy/resample.py", line 589, in load_neighbour_info
fid = zarr.open(filename, 'r')
^^^^^^^^^^^^^^^^^^^^^^^^
File "/san1/opt/mesan-composite-runner/releases/mesan-composite-runner-0.1.15/lib/python3.11/site-packages/zarr/convenience.py", line 130, in open
raise PathNotFoundError(path)
zarr.errors.PathNotFoundError: nothing found at path ''
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/san1/opt/mesan-composite-runner/releases/mesan-composite-runner-0.1.15/lib/python3.11/site-packages/satpy/resample.py", line 510, in precompute
self.load_neighbour_info(cache_dir, mask=mask, **kwargs)
File "/san1/opt/mesan-composite-runner/releases/mesan-composite-runner-0.1.15/lib/python3.11/site-packages/satpy/resample.py", line 595, in load_neighbour_info
raise IOError
OSError
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/san1/opt/mesan-composite-runner/releases/mesan-composite-runner-0.1.15/bin/mesan_composite_runner.py", line 390, in ctth_composite_worker
result_file = do_ctth_composite(time_of_analysis, delta_t, mesan_area_id, config_options)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/san1/opt/mesan-composite-runner/releases/mesan-composite-runner-0.1.15/bin/mesan_composite_runner.py", line 570, in do_ctth_composite
ctcomp.blend_cloud_products()
File "/san1/opt/mesan-composite-runner/releases/mesan-composite-runner-0.1.15/lib/python3.11/site-packages/mesan_compositer/make_ct_composite.py", line 336, in blend_cloud_products
self.blended_scene, group_name = blend_cloud_products(satpy_composite_name,
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/san1/opt/mesan-composite-runner/releases/mesan-composite-runner-0.1.15/lib/python3.11/site-packages/mesan_compositer/load_cloud_products.py", line 207, in blend_cloud_products
for _i, scene in enumerate(resampled.scenes):
^^^^^^^^^^^^^^^^
File "/san1/opt/mesan-composite-runner/releases/mesan-composite-runner-0.1.15/lib/python3.11/site-packages/satpy/multiscene/_multiscene.py", line 255, in scenes
self._scenes = list(self._scenes)
^^^^^^^^^^^^^^^^^^
File "/san1/opt/mesan-composite-runner/releases/mesan-composite-runner-0.1.15/lib/python3.11/site-packages/satpy/multiscene/_multiscene.py", line 145, in __iter__
scn = next(self._self_iter)
^^^^^^^^^^^^^^^^^^^^^
File "/san1/opt/mesan-composite-runner/releases/mesan-composite-runner-0.1.15/lib/python3.11/site-packages/satpy/multiscene/_multiscene.py", line 135, in _create_cached_iter
for scn in self._scene_gen:
File "/san1/opt/mesan-composite-runner/releases/mesan-composite-runner-0.1.15/lib/python3.11/site-packages/satpy/multiscene/_multiscene.py", line 297, in _call_scene_func
new_scn = getattr(scn, func_name)(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/san1/opt/mesan-composite-runner/releases/mesan-composite-runner-0.1.15/lib/python3.11/site-packages/satpy/scene.py", line 977, in resample
self._resampled_scene(new_scn, destination, resampler=resampler,
File "/san1/opt/mesan-composite-runner/releases/mesan-composite-runner-0.1.15/lib/python3.11/site-packages/satpy/scene.py", line 893, in _resampled_scene
res = resample_dataset(dataset, destination_area, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/san1/opt/mesan-composite-runner/releases/mesan-composite-runner-0.1.15/lib/python3.11/site-packages/satpy/resample.py", line 1454, in resample_dataset
new_data = resample(source_area, dataset, destination_area, fill_value=fill_value, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/san1/opt/mesan-composite-runner/releases/mesan-composite-runner-0.1.15/lib/python3.11/site-packages/satpy/resample.py", line 1417, in resample
res = resampler_instance.resample(data, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/san1/opt/mesan-composite-runner/releases/mesan-composite-runner-0.1.15/lib/python3.11/site-packages/satpy/resample.py", line 441, in resample
cache_id = self.precompute(cache_dir=cache_dir, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/san1/opt/mesan-composite-runner/releases/mesan-composite-runner-0.1.15/lib/python3.11/site-packages/satpy/resample.py", line 515, in precompute
self.save_neighbour_info(cache_dir, mask=mask, **kwargs)
File "/san1/opt/mesan-composite-runner/releases/mesan-composite-runner-0.1.15/lib/python3.11/site-packages/satpy/resample.py", line 619, in save_neighbour_info
zarr_out.to_zarr(filename)
File "/san1/opt/mesan-composite-runner/releases/mesan-composite-runner-0.1.15/lib/python3.11/site-packages/xarray/core/dataset.py", line 2490, in to_zarr
return to_zarr( # type: ignore[call-overload,misc]
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/san1/opt/mesan-composite-runner/releases/mesan-composite-runner-0.1.15/lib/python3.11/site-packages/xarray/backends/api.py", line 1666, in to_zarr
zstore = backends.ZarrStore.open_group(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/san1/opt/mesan-composite-runner/releases/mesan-composite-runner-0.1.15/lib/python3.11/site-packages/xarray/backends/zarr.py", line 456, in open_group
zarr_group = zarr.open_group(store, **open_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/san1/opt/mesan-composite-runner/releases/mesan-composite-runner-0.1.15/lib/python3.11/site-packages/zarr/hierarchy.py", line 1547, in open_group
raise ContainsGroupError(path)
zarr.errors.ContainsGroupError: path '' contains a group
Screenshots NA
Environment Info:
- OS: Linux
- Satpy Version:
0.44.0
- PyResample Version:
1.27.1
-
Python 3.11.6 | packaged by conda-forge | (main, Oct 3 2023, 10:40:35) [GCC 12.3.0] on linux
Additional context Add any other context about the problem here.
Satpy's Scene object has no intention of being thread or multiprocess safe, but for basic usage like this I would have expected it to work. Maybe if the multi-processing was done with dask that might work, but still I'm not sure. However, the resampling caching is not intended to work for multiple writers. The suggestion has always been to pre-seed the cache by running processing once, then do any parallel processing after that.
Is the scene
here a MultiScene or a regular Scene?
If a Scene
, then I'm not sure I understand the benefit of splitting it at this stage in the processing (before resampling). Aren't you using the same resampled output in both cases? Why not resample the Scene and then pass that resampled Scene to the remaining processing?
If a MultiScene
then I think the better long term solution would be to make Scene iterations done in parallel (with dask, but just for consistency). Basically, move the parallel portions of the code inside the code and not something the user has to worry about.