filesystem_spec icon indicating copy to clipboard operation
filesystem_spec copied to clipboard

Poor performance of async IO in a multithreaded env

Open ravwojdyla opened this issue 4 years ago • 10 comments

Thanks for the great work! I experience poor performance of IO bound tasks in a multithreaded environment.

Dask based example:

import fsspec

import dask.array as da
import xarray as xr
from dask.distributed import Client

def test(path):
    fs_map = fsspec.get_mapper(path)

    ar_dsk = da.random.random(size=(200_000, 200_000), chunks=(1_000, 1_000))
    ar_xr = xr.Dataset(data_vars=(dict(foo=(("x", "y"), ar_dsk))))
    ar_xr.to_zarr(fs_map, mode="w")

if __name__ == "__main__":
    # this saturates just one core ("IO loop")
    c = Client(processes=False)
    test("gs://foo/bar)

afaiu the current asyc design, some methods are coroutines. Let's take cat for example, _cat is the underlying coroutine, there might be multiple paths to cat, and those will happen concurrently. There is a single thread/loop per AsyncFileSystem (let's call it IO loop loop thread + default executor), that runs all coroutines per an instance of AsyncFileSystem (since GCS instances are cached that's usually more than a single instance). If a regular thread calls fs.cat, that runs maybe_sync, which effectively hands over the _cat to the IO loop and the thread waits on the results in the sync method. This works okish in a single threaded environment, not so much in a multithreaded environment like Dask, where it could oversaturate the "IO loop", effectively single threading the IO (with slight improvement given the async logic).

The issue seems to be that in the code above dask is using threadpool for tasks (in my case 16 threads), each task is mostly IO bound, so most of the work is being actually done by a single fsspec's "IO loop", whilst most of the worker threads just wait. The CPU saturation is very poor.

To validate this theory, if I use a process pool for dask cluster (each process with 1 worker thread) (via say: Client(processes=True, threads_per_worker=1)), the CPU saturation is much better since each worker process/thread has its own "IO loop" (separate processes and separate instances of FS).

This example issue should be reproducible with multprocessing as well.

Thinking about solutions: loop thread could hand over the IO work to executor (via run_in_executor), or worker threads should handle their own loop (if possible). Please let know what you think.

See an example of partial stacktrace dump:

  • Thread-1 is the loop thread (performing IO)
  • ThreadPoolExecutor-0_0 afaiu is the default executor (doing nothing)
  • Dask-Worker-Threads'-6291-0 is one of many Dask threads (waiting on results from Thread-1)
Thread 6672 (active): "Thread-1"
    loads (json/__init__.py:343)
    _call (gcsfs/core.py:503)
    _cat_file (gcsfs/core.py:826)
    _run (asyncio/events.py:81)
    _run_once (asyncio/base_events.py:1859)
    run_forever (asyncio/base_events.py:570)
    run (threading.py:870)
    _bootstrap_inner (threading.py:932)
    _bootstrap (threading.py:890)
Thread 6343 (idle): "ThreadPoolExecutor-0_0"
    _worker (concurrent/futures/thread.py:78)
    run (threading.py:870)
    _bootstrap_inner (threading.py:932)
    _bootstrap (threading.py:890)
Thread 6350 (idle): "Dask-Worker-Threads'-6291-0"
    wait (threading.py:306)
    wait (threading.py:558)
    sync (fsspec/asyn.py:68)
    maybe_sync (fsspec/asyn.py:100)
    cat (fsspec/asyn.py:226)
    getitems (fsspec/mapping.py:89)
    _chunk_getitems (zarr/core.py:1666)
    _get_selection (zarr/core.py:1033)
    _get_basic_selection_nd (zarr/core.py:739)
    get_basic_selection (zarr/core.py:696)
    __getitem__ (zarr/core.py:571)
    __getitem__ (xarray/backends/zarr.py:56)
    __array__ (xarray/core/indexing.py:560)
    asarray (numpy/core/_asarray.py:83)
    __array__ (xarray/core/indexing.py:495)
    asarray (numpy/core/_asarray.py:83)
    getter (dask/array/core.py:102)
    apply_function (distributed/worker.py:3411)
    run (distributed/_concurrent_futures_thread.py:65)
    _worker (distributed/threadpoolexecutor.py:55)
    run (threading.py:870)
    _bootstrap_inner (threading.py:932)
    _bootstrap (threading.py:890)

ravwojdyla avatar Nov 24 '20 19:11 ravwojdyla

I can think of a few ways to handle this, but I am surprised that this doesn't seem to be a problem elsewhere. To be sure: you are right, and threads will be calling the same instance of the file-system class, sharing any connection pool and ioloop.

However, the point is, that IO operations involve waiting or reading from a socket, and the overhead for running tasks on an ioloop is very small. Only one thing can be reading from the network at a time, after all.

For example, the dask worker allocates threads for CPU-bound work, but network operations (talking to the scheduler, etc) happen on a single ioloop.

process pool for dask cluster, the CPU saturation is much better

so what are the processes doing? Are you sure you didn't just increase the amount of CPU work to do per task for copies and serialisation, without actually improving the total data throughput?

@mrocklin and maybe @quasiben would have intuition here

martindurant avatar Nov 24 '20 20:11 martindurant

@martindurant

and the overhead for running tasks on an ioloop is very small.

Looking at the stacktrace of the "loop thread":

Thread 6672 (active): "Thread-1"
    loads (json/__init__.py:343)
    _call (gcsfs/core.py:503)
    _cat_file (gcsfs/core.py:826)
    _run (asyncio/events.py:81)
    _run_once (asyncio/base_events.py:1859)
    run_forever (asyncio/base_events.py:570)
    run (threading.py:870)
    _bootstrap_inner (threading.py:932)
    _bootstrap (threading.py:890)

You can see a random snapshot of the "loop thread", occupied with reading, parsing etc data from all 16 dask threads that just wait for return from cat. If all 16 dask threads are IO bound, they will just hammer the single "loop thread" with work.

so what are the processes doing? Are you sure you didn't just increase the amount of CPU work to do per task for copies and serialisation, without actually improving the total data throughput?

It's the same code, just different pool thread vs process. In the thread pool, I believe dask threads oversaturate the single "loop thread" with work, and mostly just wait. In process pool, each worker has its own "loop thread", thus can't easily oversaturate it. Does this make sense?

Edit: GIL isn't an issue in process pool, that might have an impact too.

ravwojdyla avatar Nov 24 '20 20:11 ravwojdyla

It's the same code, just different pool thread vs process. In the thread pool, I believe dask threads oversaturate the single "loop thread" with work, and mostly just wait. In process pool, each worker has its own "loop thread", thus can't easily oversaturate it. Does this make sense?

It does not make sense in my mind for io-bound work, which is what the loop thread is for. It is OK that the other threads are waiting, they would be waiting regardless.

So, is your process-based workflow actually getting through work faster? The dask dashboard would be able to tell you what things the (apparently extra) CPU time is being spent on.

martindurant avatar Nov 24 '20 21:11 martindurant

See reports for the same logic to see the difference:

I have also separately measured the GIL on thread based one, and there is somewhat significant GIL hold on the IO loop thread (which makes sense). Closing this issue since I can use process based cluster, and some part of it can definitely by due to GIL. Thanks.

ravwojdyla avatar Nov 24 '20 23:11 ravwojdyla

Let's keep this open, I may have time to look into it - and thank you for the profile report. What versions of zarr, xarray, fsspec, gcsfs and dask/distributed are you using? Is the data public, so that I could run the exact same tests?

martindurant avatar Nov 25 '20 15:11 martindurant

@martindurant versions:

xarray==0.16.1
zarr==2.5.0
fsspec==0.8.4
gcsfs==0.7.1
dask==2.30.0
distributed==2.30.0

The test data used was a randomly generated float array via da.random.random.

ravwojdyla avatar Nov 30 '20 11:11 ravwojdyla

Oh, in that case I would appreciate if you just posted the whole code

martindurant avatar Nov 30 '20 14:11 martindurant

@martindurant

def write(path):
    fs_map = fsspec.get_mapper(path)

    ar_dsk = da.random.random(size=(100_000, 100_000), chunks=(1_000, 1_000))
    ar_xr = xr.Dataset(data_vars=(dict(foo=(("x", "y"), ar_dsk))))
    ar_xr.to_zarr(fs_map, mode="w")

def read(path):
    fs_map = fsspec.get_mapper(path)

    ar_xr = xr.open_zarr(fs_map) 
    with performance_report(filename="dask-report.html"):
        r = (ar_xr.foo * 2.0).persist()
        progress(r)

ravwojdyla avatar Nov 30 '20 21:11 ravwojdyla

Would you like to run your benchmarks with the latest versions?

martindurant avatar Apr 13 '21 15:04 martindurant

We always run into this and try to circumvent it by using processes (even with the latest gcsfs).

However, it would be nice if it got fixed, processes are too heavy sometimes, and with PyTorch's DataLoaders we try to use multiple threads (don't want to create processes from processes) to read concurrently and hit this bottleneck.

@martindurant I can also generate some simpler benchmarks if it helps without xarray and with directly zarr etc.

tasansal avatar Dec 12 '23 20:12 tasansal