satpy icon indicating copy to clipboard operation
satpy copied to clipboard

scn.save_dataset() slows down gradually during a loop

Open yukaribbba opened this issue 1 year ago • 30 comments

Describe the bug The results I need are some raw bands in float32 geotiffs, without any corrections or enhancements. During the process, cpu/memory usage and system temps remained a resonable level. I got what I want and no errors poped up. Just the processing time is longer and longer. The more datasets and products involved, the more significant this becomes. And it happens on several readers like ami, abi or fci, not just ahi (I haven't tested others yet). image

To Reproduce

import glob
import os
import time as time_calc
import warnings

import numpy as np
import gc


os.environ.setdefault("DASK_ARRAY__CHUNK_SIZE", "16MiB")
os.environ.setdefault("DASK_NUM_WORKERS", "12")
os.environ.setdefault("OMP_NUM_THREADS", "8")
os.environ.setdefault("PSP_CONFIG_FILE", "D:/satpy_config/pyspectral/pyspectral.yaml")
warnings.simplefilter(action="ignore", category=FutureWarning)
warnings.simplefilter(action="ignore", category=UserWarning)
warnings.simplefilter(action="ignore", category=RuntimeWarning)
from satpy import Scene, find_files_and_readers, config, utils

utils.debug_on()

config.set(config_path=["D:/satpy_config"])

def satpy_sat_to_float32(folder, sat_name, scan_area, reader_name, band_image_list):
    tmp_folder = f"{folder}/work_tmp"

    start_load = time_calc.perf_counter()
    files = find_files_and_readers(base_dir=folder, reader=reader_name)
    scn = Scene(filenames=files)
    scn.load(band_image_list)
    end_load = time_calc.perf_counter()
    print(f"Load sat dataset: {(end_load - start_load): .6f}")

    start_save = time_calc.perf_counter()
    for band_image in band_image_list:
        output = (f"{sat_name}_{scn[band_image].attrs["sensor"]}_{band_image}_"
                  f"{scn.start_time.strftime("%Y%m%d%H%M%S")}_{scan_area}")
        output_filename = f"{output}.tif"

        scn.save_dataset(band_image, filename=output_filename, writer="geotiff",
                         base_dir=tmp_folder, num_threads=8, compress=None, enhance=False, dtype=np.float32,
                         fill_value=np.nan)

    end_save = time_calc.perf_counter()
    print(f"Save to float32 geotiff: {(end_save - start_save): .6f}")
    del scn
    gc.collect()

folders = glob.glob("C:/Users/45107/Downloads/Sat/Geo/H09*FLDK")
for folder in folders:
    print(folder)
    satpy_sat_to_float32(folder, "H09", "FLDK", "ahi_hsd", ["Visible004_1000", "Visible005_1000",
                                                            "Visible006_500", "Visible008_1000",
                                                            "NearInfraRed016_2000", "NearInfraRed022_2000",
                                                            "InfraRed038_2000", "InfraRed063_2000",
                                                            "InfraRed069_2000", "InfraRed073_2000", "InfraRed087_2000",
                                                            "InfraRed096_2000", "InfraRed105_2000", "InfraRed112_2000",
                                                            "InfraRed123_2000", "InfraRed133_2000"])


Expected behavior Time consumption should be more stable.

Actual results A part of the debug output: debug.txt

Environment Info:

  • OS: Windows 11
  • Satpy Version: 0.52.1
  • Dask: 2024.10.0
  • rasterio: 1.4.2

yukaribbba avatar Nov 03 '24 15:11 yukaribbba

P.S. I also tried compute_writer_result() to save them in one go but still got the similar result.

yukaribbba avatar Nov 03 '24 15:11 yukaribbba

Without testing anything, one thing that should help at least a bit is to add tiled=True, blockxsize=512, blockysize=512 to the save_dataset() call. Then the data can actually be written in parallel.

Also the compute=False and compute_writer_results() that you've tried should help because then the navigation data should be accessed/computed only once.

I'm not sure (again, without testing) I'm not sure how the compress and num_threads kwargs are working the way you have them now. Setting GDAL_NUM_THREADS=6 environment variable will tell GDAL to use six cores. Playing with DASK_NUM_WORKERS and DASK_ARRAY__CHUNK_SIZE might also help. These should be set before import Scene.

I don't know what is causing the increase in processing time though. I remember we had some memory accumulation happening in Trollflow/Trollflow2 before we switched to separate processes for consecutive runs.

pnuu avatar Nov 03 '24 16:11 pnuu

Thanks @pnuu! I tried these options and they do help a lot to improve the baseline, but can't stop it coming down little by little (not that much as previous one though). Here's the result of my 142 datasets. If the data list is long enough this could still be a problem. I wonder if there're other things wrong here but can't figure them out...

import glob
import logging
import os
import time as time_calc
import warnings

import numpy as np
import gc

logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler("output.log"),
        logging.StreamHandler()
    ]
)

os.environ.setdefault("DASK_ARRAY__CHUNK_SIZE", "16MiB")
os.environ.setdefault("DASK_NUM_WORKERS", "12")
os.environ.setdefault("OMP_NUM_THREADS", "8")
os.environ.setdefault("GDAL_NUM_THREADS", "8")
os.environ.setdefault("PSP_CONFIG_FILE", "D:/satpy_config/pyspectral/pyspectral.yaml")
warnings.simplefilter(action="ignore", category=FutureWarning)
warnings.simplefilter(action="ignore", category=UserWarning)
warnings.simplefilter(action="ignore", category=RuntimeWarning)
from satpy import Scene, find_files_and_readers, config, utils
from satpy.writers import compute_writer_results

utils.debug_on()

config.set(config_path=["D:/satpy_config"])


def satpy_sat_to_float32(folder, sat_name, scan_area, reader_name, band_image_list):
    tmp_folder = f"{folder}/work_tmp"

    start_load = time_calc.perf_counter()
    files = find_files_and_readers(base_dir=folder, reader=reader_name)
    scn = Scene(filenames=files)
    scn.load(band_image_list)
    end_load = time_calc.perf_counter()
    print(f"Load sat dataset: {(end_load - start_load): .6f}")

    start_save = time_calc.perf_counter()
    res_list = []
    for band_image in band_image_list:
        output = (f"{sat_name}_{scn[band_image].attrs["sensor"]}_{band_image}_"
                  f"{scn.start_time.strftime("%Y%m%d%H%M%S")}_{scan_area}")
        output_filename = f"{output}.tif"

        res = scn.save_dataset(band_image, filename=output_filename, writer="geotiff", tiled=True, blockxsize=512, blockysize=512,
                         base_dir=tmp_folder, num_threads=8, compress=None, enhance=False, dtype=np.float32, compute=False,
                         fill_value=np.nan)
        res_list.append(res)

    compute_writer_results(res_list)
    end_save = time_calc.perf_counter()
    print(f"Save to float32 geotiff: {(end_save - start_save): .6f}")
    del scn
    gc.collect()

folders = glob.glob("C:/Users/45107/Downloads/Sat/Geo/H09*FLDK")
for folder in folders:
    print(folder)
    satpy_sat_to_float32(folder, "H09", "FLDK", "ahi_hsd", ["Visible004_1000", "Visible005_1000",
                                                            "Visible006_500", "Visible008_1000",
                                                            "NearInfraRed016_2000", "NearInfraRed022_2000",
                                                            "InfraRed038_2000", "InfraRed063_2000",
                                                            "InfraRed069_2000", "InfraRed073_2000", "InfraRed087_2000",
                                                            "InfraRed096_2000", "InfraRed105_2000", "InfraRed112_2000",
                                                            "InfraRed123_2000", "InfraRed133_2000"])

image

Debug output of the first and last dataset: debug.txt

yukaribbba avatar Nov 04 '24 08:11 yukaribbba

@yukaribbba do you see the same trend in memory usage? A while ago, when working on a batch-processing lib called trollflow2, we realized something was leaking memory very slowly. We could never pinpoint it exactly, but we suspected it was something in dask at the time. We didn't investigate much after that, and started running every slot in individual processes instead. That seems to clean up the memory at least. Maybe something to try here too?

mraspaud avatar Nov 04 '24 10:11 mraspaud

@mraspaud I'm also guessing this is about memory, garbage collection or things like that. My memory remained normal during the workflow. Actually this is a part of a QThread but I do them in batch mode. Looks like I need to split all those data folders one by one like you said. Start the thread, do the job, quit the thread, move to next folder and start it again. Maybe this can avoid the problem? I'll give it a try.

yukaribbba avatar Nov 04 '24 10:11 yukaribbba

Threading might work, but I would recommend going all the way and using multiprocessing instead. Memory can be shared in threads, but in principle not with separate processes.

mraspaud avatar Nov 04 '24 11:11 mraspaud

multiprocessing did it, nice and clean. @mraspaud Appreciate that 😄 But I still want to keep this open to see if we can find what's truly behind it and fix it if possible. image

yukaribbba avatar Nov 04 '24 14:11 yukaribbba

At the time, the hardest part was to find a minimal example to reproduce the error... It would be great if that could be done, so we can investigate further!

mraspaud avatar Nov 04 '24 14:11 mraspaud

I don't recall the outcome of everything we tried last time this came up, but there are few things I think we could try:

  1. As Martin said, getting a minimal example would be amazing.

  2. Towards a minimal example and to simplify your code, unless I'm mistaken there is nothing in your filename that couldn't be formatted by the Scene/Writer itself. You should be able to pass filename as a format string, call Scene.save_datasets instead of .save_dataset, and have the Scene do the filename formatting for you. That way you don't have to do any of this compute_writer_results stuff. So something like this if I'm not mistaken:

    filename = "{platform_name}_{sensor}_{name}_{start_time:%Y%m%d%H%M%S}_{area.area_id}.tif"
    

    But if .area_id is not accurate you could hardcode that before hand since it doesn't change per-dataset (ex. + f"{scan_area}.tif" or use double {{ }} to allow you to do formatting).

  3. What if you use the PNG writer? It will take longer to process but since it uses a completely different backend library it might behave differently memory/cache-wise.

djhoese avatar Nov 04 '24 16:11 djhoese

Ok I have two minimal examples here, one for png:

import glob
import os
import time as time_calc
import warnings
import numpy as np

os.environ.setdefault("DASK_ARRAY__CHUNK_SIZE", "16MiB")
os.environ.setdefault("DASK_NUM_WORKERS", "12")
os.environ.setdefault("OMP_NUM_THREADS", "8")
os.environ.setdefault("GDAL_NUM_THREADS", "8")
os.environ.setdefault("PSP_CONFIG_FILE", "D:/satpy_config/pyspectral/pyspectral.yaml")
warnings.simplefilter(action="ignore", category=FutureWarning)
warnings.simplefilter(action="ignore", category=UserWarning)
warnings.simplefilter(action="ignore", category=RuntimeWarning)
from satpy import Scene, find_files_and_readers, config

config.set(config_path=["D:/satpy_config"])


def satpy_sat_to_float32(folder, reader_name, band_image_list):
    tmp_folder = f"{folder}/work_tmp"

    start_load = time_calc.perf_counter()
    files = find_files_and_readers(base_dir=folder, reader=reader_name)
    scn = Scene(filenames=files)
    scn.load(band_image_list)
    end_load = time_calc.perf_counter()
    print(f"Load sat dataset: {(end_load - start_load): .6f}")

    start_save = time_calc.perf_counter()
    scn.save_datasets(writer="simple_image",
                      filename="{platform_name}_{sensor}_{name}_{start_time:%Y%m%d%H%M%S}_{area.area_id}.png",
                      base_dir=tmp_folder, enhance=False, compress_level=0)
    end_save = time_calc.perf_counter()
    print(f"Save to int8 png: {(end_save - start_save): .6f}")
    del scn

folders = glob.glob("C:/Users/45107/Downloads/Sat/Geo/H09*FLDK")
results = []
for folder in folders:
    print(folder)
    satpy_sat_to_float32(folder, "ahi_hsd",
                         [
                             "Visible004_1000", "Visible005_1000", "Visible006_500", "Visible008_1000",
                             "NearInfraRed016_2000", "NearInfraRed022_2000", "InfraRed038_2000", "InfraRed063_2000",
                             "InfraRed069_2000", "InfraRed073_2000", "InfraRed087_2000", "InfraRed096_2000",
                             "InfraRed105_2000", "InfraRed112_2000", "InfraRed123_2000", "InfraRed133_2000"
                         ]
                         )

another for geotiff:

......
scn.save_datasets(writer="geotiff",
                      filename="{platform_name}_{sensor}_{name}_{start_time:%Y%m%d%H%M%S}_{area.area_id}.tif",
                      base_dir=tmp_folder, blockxsize=512, blockysize=512, num_threads=8, compress=None,
                      enhance=False, dtype=np.float32, fill_value=np.nan)
......

Both of them showed sign of downgrade: image image

yukaribbba avatar Nov 05 '24 10:11 yukaribbba

Ok, new thing to try: Remove the scn.save_datasets call completely and replace with scn.compute(). This should compute the dask arrays inside the DataArrays and then immediately drop them (since the return value isn't saved to a variable name) and garbage collection should clean it up. You could even add a collect call to the end of the loop to force it to be garbage collected:

import gc
print("Unreachable objects: ", gc.collect())

djhoese avatar Nov 05 '24 21:11 djhoese

Sorry I'm not available till saturday. I'll check that once free again.

yukaribbba avatar Nov 06 '24 04:11 yukaribbba

This time I got nearly 300 datasets for scn.compute(). And if you ignore the very start of the test, they just look normal in general. image For the gc.collect(). At first it jumps between 0 and 36991 and then rises to 38642 later. image

yukaribbba avatar Nov 10 '24 03:11 yukaribbba

As for geotiff, I don't have much space on SSD to hold the output of 284 datasets so I did it on a hard drive and the whole progress became slow, but you can still see the trend clearly. @djhoese image

yukaribbba avatar Nov 12 '24 06:11 yukaribbba

@yukaribbba that's really interesting! so that means that the image saving is the leaking?

mraspaud avatar Nov 12 '24 08:11 mraspaud

@mraspaud It looks like that. I feel like something isn't cleaned up in the writer part so they got accumulated little by little.

yukaribbba avatar Nov 12 '24 08:11 yukaribbba

Have you tried the garbage collection collect after the save_datasets? Does that change this timing result?

Edit: Oops, fat fingered the close button instead of comment.

djhoese avatar Nov 13 '24 17:11 djhoese

I did but it's not helping. But I found some other things. scn.save_datasets() is using compute_writer_results. In compute_writer_results I see two parts: one for dask compute, another for file writing. So I add two timers for each.

def compute_writer_results(results):
    """Compute all the given dask graphs `results` so that the files are saved.

    Args:
        results (iterable): Iterable of dask graphs resulting from calls to
                            `scn.save_datasets(..., compute=False)`
    """
    if not results:
        return

    sources, targets, delayeds = split_results(results)

    # one or more writers have targets that we need to close in the future
    if targets:
        delayeds.append(da.store(sources, targets, compute=False))

    import time as time_calc
    start_compute = time_calc.perf_counter()
    if delayeds:
        da.compute(delayeds)
    end_compute = time_calc.perf_counter()
    print("da.compute: ", end_compute - start_compute)

    start_close = time_calc.perf_counter()
    if targets:
        for target in targets:
            if hasattr(target, "close"):
                target.close()
    end_close = time_calc.perf_counter()
    print("target close: ", end_close - start_close)

image image scn.save_datasets should be done after target.close(). But actually it's not, an unknown part exists and is slowing down. image Since I put gc.collect() out side the timer, it must be other things but I don't know what that is.

start_save = time_calc.perf_counter()
scn.save_datasets(writer="geotiff",
                      filename="{platform_name}_{sensor}_{name}_{start_time:%Y%m%d%H%M%S}_{area.area_id}.tif",
                      base_dir=tmp_folder,
                      blockxsize=512, blockysize=512, num_threads=8, compress=None,
                      enhance=False, dtype=np.float32, fill_value=np.nan)
end_save = time_calc.perf_counter()
print(f"Save to float32 geotiff: {(end_save - start_save): .6f}")
gc.collect()
del scn

yukaribbba avatar Nov 14 '24 10:11 yukaribbba

I could imagine that being either:

  1. Dask cleaning things up.
  2. GDAL/rasterio/rioxarray cleaning things up.

After the writer's save_datasets method is called, I believe the Scene throws away (allows it to be garbage collected) the Writer object.

Overall I don't think geotiff should be used for performance testing of this sort because there are too many unknowns in rasterio/gdal's caching to really tie down what's happening. PNG, although slower, should be "dumber" as far as background threads and shared caches.

djhoese avatar Nov 14 '24 15:11 djhoese

Unexpectedly, with gc.collect() png writer looks normal and stable. And that mysterious 'cleaning up' part also disapeared. image

So this is more like a GDAL cache issue I guess? I'll go back to geotiff and make GDAL clear its cache by force in every loop, and see how it goes.

yukaribbba avatar Nov 15 '24 15:11 yukaribbba

How do you force GDAL to clear its cache?

djhoese avatar Nov 15 '24 17:11 djhoese

Could it be similar to this? https://github.com/OSGeo/gdal/issues/7908 Or the need to use tcmalloc? https://gdal.org/en/latest/user/multithreading.html#ram-fragmentation-and-multi-threading

simonrp84 avatar Nov 15 '24 18:11 simonrp84

Oh boy that tcmalloc "solution" is quite the hack. You're basically telling the dynamic library loader to use a different dependency library.

I don't see it in graph form in this thread, but has memory usage been tracked over the execution time? Preferably with some profiling tool. If you lowered the GDAL_CACHEMAX environment variable enough I'd expect you either see the slow execution time sooner/faster or you see it plateau. My thinking is that if the cache max was lowered to use less memory then execution might slow down sooner if GDAL is getting a lot of cache misses. Or execution time would plateau because of all the cache misses. I guess for the same reasons increasing gdal cache max would be good to test. If GDAL is given like 80% of your RAM and we assume no hardware issues (some memory chips on the system are slower/more damaged than others) then you'd think you'd see some kind of flattening of the execution speed as GDAL is able to use more and more cache/RAM space. This assumes that you have enough RAM on your system to fit all/most of the rasters being saved/worked on.

djhoese avatar Nov 15 '24 19:11 djhoese

OK several things newly found:

  1. Just by setting GDAL_CACHEMAX to 4096MB (the default on my machine is 3273MB), it's getting much better now. da.compute is completely stable. I don't know if 800MB will make such a big difference. image

  2. The only lagging part is that "cleaning up", and I was wrong about it. Actually it didn't happen after compute_writer_results, but before it, in these lines: https://github.com/pytroll/satpy/blob/c2149175d2256e09ea0f61505a151e5625a80d87/satpy/writers/init.py#L750C1-L752C75 But I just don't understand why. image

yukaribbba avatar Nov 17 '24 08:11 yukaribbba

Put more timers in trollimage.xrimage and found some 'suspicious' lines: https://github.com/pytroll/trollimage/blob/6dced22bc8e7a36513f77bcbf027314dec0a5a1a/trollimage/xrimage.py#L391-L400 image Maybe this newly opened RIOFile didn't get properly closed? @djhoese

Edit: memeory usage seems to be normal. In every loop its maximum reaches 16-17GB and drops quickly.

yukaribbba avatar Nov 18 '24 06:11 yukaribbba

Sorry for not responding earlier. Out with a sick kid so I'll be a little slow for at least today.

This is expected and is a downside to geotiff writing with rasterio and dask at the time we wrote this. So first, we have to tell dask to use a lock between threads to not write in parallel because rasterio doesn't (didn't?) support parallel geotiff writing. Dask also doesn't have a way to say "this file is related to these dask tasks and should be closed when they are completed". So we keep the file-like object open and have the user close it. I believe rioxarray has better handling of this, but none of the core trollimage developers have had a chance to migrate this code.

djhoese avatar Nov 18 '24 14:11 djhoese

OK I see. So is there any way to close it myself?

yukaribbba avatar Nov 18 '24 14:11 yukaribbba

That's one of the things compute_writer_results does. One complication would be if dask and/or GDAL is doing something like copying some of these objects that cross thread boundaries and is not cleaning them up properly. Not that they're doing anything wrong, but they may not be used to people abusing the system like we are. Or we could be doing something wrong and not cleaning things up properly given the weird circumstance(s).

djhoese avatar Nov 18 '24 15:11 djhoese

I decide to rewrite XRImage.riosave and compute_writer_results. Call GDAL directly instead of rasterio for file handling and use ThreadPoolExecutor for parallel writing. The result isn't that bad and the most important thing is: it's stable. You know I have a very very long list of datasets waiting so I can't stand any turbulence. image

yukaribbba avatar Nov 20 '24 08:11 yukaribbba

very interesting! I have nothing against using GDAL directly as long the functionality is preserved. However I'm wondering if we are doing something wrong in riosave, or if the issue is in the rasterio library... I will try to check today what happens if we use rioxarray for saving instead of rasterio.

mraspaud avatar Nov 20 '24 10:11 mraspaud