satpy icon indicating copy to clipboard operation
satpy copied to clipboard

Race condition while caching for resampling

Open adybbroe opened this issue 9 months ago • 1 comments

Describe the bug I think there could be an issue with caching for re-sampling when two similar processes are running in parallel using threading. See further below.

To Reproduce

I have a process running that listens for new NWCSAF/Geo cloud products. It starts processing when either a Cloud Type or a CTTH product is ready, with the code shown below. The code launch a separate threaded processing using the apply_async method from the multiprocessing Pool module. As the both the CLoud Type and CTTH products are in practice ready almost at the same time, this code will end up running both the "workers" below in parallel. This is when having:

pool = Pool(processes=6, maxtasksperchild=1)

If I lower the processes keyword to 1 doing pool = Pool(processes=1, maxtasksperchild=1) the race conditions issue disappears!

# Your code here
            if product == "CT":
                LOG.debug("Product is CT")
                pool.apply_async(ctype_composite_worker,
                                 (scene,
                                  jobs_dict[
                                      keyname],
                                  publisher_q,
                                  config_options))

            elif product == "CTTH":
                LOG.debug("Product is CTTH")
                pool.apply_async(ctth_composite_worker,
                                 (scene,
                                  jobs_dict[
                                      keyname],
                                  publisher_q,
                                  config_options))

The two workers above are rather similar and essentially make a composite scene (cloud type or ctth) using Satpy's Multiscene blend functionality and publish the result by sending a Posttroll message in the end. In addition the blended scene is written to disk and a quicklook image is generated and saved, and from the netCDF blended result a set of super-observations are created and an ascii file is written to disk after the netCDF result has been published.

Expected behavior So, I would have expected that it was possible to do this processing in parallel without getting into the race condition mentioned.

As I said above, if I force the processing to be sequjential I don't have the same problem anymore.

Actual results Text output of actual results or error messages including full tracebacks if applicable.

[2023-11-13 07:30:40,020 ERROR    __main__] Failed in ctth_composite_worker...
Traceback (most recent call last):
  File "/san1/opt/mesan-composite-runner/releases/mesan-composite-runner-0.1.15/lib/python3.11/site-packages/satpy/resample.py", line 589, in load_neighbour_info
    fid = zarr.open(filename, 'r')
          ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/san1/opt/mesan-composite-runner/releases/mesan-composite-runner-0.1.15/lib/python3.11/site-packages/zarr/convenience.py", line 130, in open
    raise PathNotFoundError(path)
zarr.errors.PathNotFoundError: nothing found at path ''

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/san1/opt/mesan-composite-runner/releases/mesan-composite-runner-0.1.15/lib/python3.11/site-packages/satpy/resample.py", line 510, in precompute
    self.load_neighbour_info(cache_dir, mask=mask, **kwargs)
  File "/san1/opt/mesan-composite-runner/releases/mesan-composite-runner-0.1.15/lib/python3.11/site-packages/satpy/resample.py", line 595, in load_neighbour_info
    raise IOError
OSError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/san1/opt/mesan-composite-runner/releases/mesan-composite-runner-0.1.15/bin/mesan_composite_runner.py", line 390, in ctth_composite_worker
    result_file = do_ctth_composite(time_of_analysis, delta_t, mesan_area_id, config_options)
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/san1/opt/mesan-composite-runner/releases/mesan-composite-runner-0.1.15/bin/mesan_composite_runner.py", line 570, in do_ctth_composite
    ctcomp.blend_cloud_products()
  File "/san1/opt/mesan-composite-runner/releases/mesan-composite-runner-0.1.15/lib/python3.11/site-packages/mesan_compositer/make_ct_composite.py", line 336, in blend_cloud_products
    self.blended_scene, group_name = blend_cloud_products(satpy_composite_name,
                                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/san1/opt/mesan-composite-runner/releases/mesan-composite-runner-0.1.15/lib/python3.11/site-packages/mesan_compositer/load_cloud_products.py", line 207, in blend_cloud_products
    for _i, scene in enumerate(resampled.scenes):
                               ^^^^^^^^^^^^^^^^
  File "/san1/opt/mesan-composite-runner/releases/mesan-composite-runner-0.1.15/lib/python3.11/site-packages/satpy/multiscene/_multiscene.py", line 255, in scenes
    self._scenes = list(self._scenes)
                   ^^^^^^^^^^^^^^^^^^
  File "/san1/opt/mesan-composite-runner/releases/mesan-composite-runner-0.1.15/lib/python3.11/site-packages/satpy/multiscene/_multiscene.py", line 145, in __iter__
    scn = next(self._self_iter)
          ^^^^^^^^^^^^^^^^^^^^^
  File "/san1/opt/mesan-composite-runner/releases/mesan-composite-runner-0.1.15/lib/python3.11/site-packages/satpy/multiscene/_multiscene.py", line 135, in _create_cached_iter
    for scn in self._scene_gen:
  File "/san1/opt/mesan-composite-runner/releases/mesan-composite-runner-0.1.15/lib/python3.11/site-packages/satpy/multiscene/_multiscene.py", line 297, in _call_scene_func
    new_scn = getattr(scn, func_name)(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/san1/opt/mesan-composite-runner/releases/mesan-composite-runner-0.1.15/lib/python3.11/site-packages/satpy/scene.py", line 977, in resample
    self._resampled_scene(new_scn, destination, resampler=resampler,
  File "/san1/opt/mesan-composite-runner/releases/mesan-composite-runner-0.1.15/lib/python3.11/site-packages/satpy/scene.py", line 893, in _resampled_scene
    res = resample_dataset(dataset, destination_area, **kwargs)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/san1/opt/mesan-composite-runner/releases/mesan-composite-runner-0.1.15/lib/python3.11/site-packages/satpy/resample.py", line 1454, in resample_dataset
    new_data = resample(source_area, dataset, destination_area, fill_value=fill_value, **kwargs)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/san1/opt/mesan-composite-runner/releases/mesan-composite-runner-0.1.15/lib/python3.11/site-packages/satpy/resample.py", line 1417, in resample
    res = resampler_instance.resample(data, **kwargs)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/san1/opt/mesan-composite-runner/releases/mesan-composite-runner-0.1.15/lib/python3.11/site-packages/satpy/resample.py", line 441, in resample
      cache_id = self.precompute(cache_dir=cache_dir, **kwargs)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/san1/opt/mesan-composite-runner/releases/mesan-composite-runner-0.1.15/lib/python3.11/site-packages/satpy/resample.py", line 515, in precompute
    self.save_neighbour_info(cache_dir, mask=mask, **kwargs)
  File "/san1/opt/mesan-composite-runner/releases/mesan-composite-runner-0.1.15/lib/python3.11/site-packages/satpy/resample.py", line 619, in save_neighbour_info
    zarr_out.to_zarr(filename)
  File "/san1/opt/mesan-composite-runner/releases/mesan-composite-runner-0.1.15/lib/python3.11/site-packages/xarray/core/dataset.py", line 2490, in to_zarr
    return to_zarr(  # type: ignore[call-overload,misc]
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/san1/opt/mesan-composite-runner/releases/mesan-composite-runner-0.1.15/lib/python3.11/site-packages/xarray/backends/api.py", line 1666, in to_zarr
    zstore = backends.ZarrStore.open_group(
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/san1/opt/mesan-composite-runner/releases/mesan-composite-runner-0.1.15/lib/python3.11/site-packages/xarray/backends/zarr.py", line 456, in open_group
    zarr_group = zarr.open_group(store, **open_kwargs)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/san1/opt/mesan-composite-runner/releases/mesan-composite-runner-0.1.15/lib/python3.11/site-packages/zarr/hierarchy.py", line 1547, in open_group
    raise ContainsGroupError(path)
zarr.errors.ContainsGroupError: path '' contains a group

Screenshots NA

Environment Info:

  • OS: Linux
  • Satpy Version: 0.44.0
  • PyResample Version: 1.27.1
  • Python 3.11.6 | packaged by conda-forge | (main, Oct 3 2023, 10:40:35) [GCC 12.3.0] on linux

Additional context Add any other context about the problem here.

adybbroe avatar Nov 14 '23 07:11 adybbroe

Satpy's Scene object has no intention of being thread or multiprocess safe, but for basic usage like this I would have expected it to work. Maybe if the multi-processing was done with dask that might work, but still I'm not sure. However, the resampling caching is not intended to work for multiple writers. The suggestion has always been to pre-seed the cache by running processing once, then do any parallel processing after that.

Is the scene here a MultiScene or a regular Scene?

If a Scene, then I'm not sure I understand the benefit of splitting it at this stage in the processing (before resampling). Aren't you using the same resampled output in both cases? Why not resample the Scene and then pass that resampled Scene to the remaining processing?

If a MultiScene then I think the better long term solution would be to make Scene iterations done in parallel (with dask, but just for consistency). Basically, move the parallel portions of the code inside the code and not something the user has to worry about.

djhoese avatar Nov 14 '23 14:11 djhoese