dask icon indicating copy to clipboard operation
dask copied to clipboard

Read_parquet is slower than expected with S3

Open mrocklin opened this issue 3 years ago • 51 comments

I was looking at a read_parquet profile with @th3ed @ncclementi and @gjoseph92

Looking at this performance report: https://raw.githubusercontent.com/coiled/h2o-benchmarks/main/performance-reports-pyarr_str-50GB/q1_50GB_pyarr.html I see the following analysis (two minute video): https://www.loom.com/share/4c8ad1c5251a4e658c1c47ee2113f34a

We're spending only about 20-25% of our time reading from S3, and about 5% of our time converting data to Pandas. We're spending a lot of our time doing something else.

@gjoseph92 took a look at this with pyspy and generated reports like the following: tls-10_0_0_177-42425.json

I'm copying a note from him below:

What you'll see from this is that pyarrow isn't doing the actual reads. Because dask uses s3fs, the C++ arrow code has to call back into Python for each read. Ultimately, the reads are actually happening on the fsspec event loop (see the fsspecIO thread in profiles). If we look there, about 40% of CPU time is spent waiting for something (aka data from S3, good), but 60% is spent doing stuff in Python (which I'd consider overhead, to some degree).

We can also see that 30% of the total time is spent blocking on Python's GIL (all the pthread_cond_timedwaits) (look at the functions calling into this and the corresponding lines in the Python source if you don't believe me; they're all Py_END_ALLOW_THREADS). This is an issue known as the convoy effect: https://bugs.python.org/issue7946, https://github.com/dask/distributed/issues/6325.

My takeaway is that using fsspec means dask is using Python for reads, which might be adding significant overhead / reducing parallelism due to the GIL.

I'd be interested in doing a comparison by hacking together a version that bypasses fsspec, and uses pyarrow's native S3FileSystem directly. Before that though, it might be good to get some baseline numbers on how fast we can pull the raw data from S3 (just as bytes), to understand what performance we can expect.

FYI I also tried https://developer.nvidia.com/blog/optimizing-access-to-parquet-data-with-fsspec/, but it was ~2x slower. Haven't tried repeating that though, so not sure if it's a real result.

One other thing I find surprising is that polars appears to be using fsspec for reads as well, rather than the native S3FileSystem or GCSFileSystem: https://github.com/pola-rs/polars/blob/445c550e8f965d9e8f2da1cb2d01b6c15874f6c8/py-polars/polars/io.py#L949-L956 https://github.com/pola-rs/polars/blob/445c550e8f965d9e8f2da1cb2d01b6c15874f6c8/py-polars/polars/internals/io.py#L114-L121

I would have expected polars and dask read performance to be closer in this case. We should probably confirm for ourselves that they're not.

It looks like we could make things a lot faster. I'm curious about the right steps to isolate the problem further.

cc'ing @martindurant @rjzamora @ritchie46 @fjetter

mrocklin avatar Nov 03 '22 13:11 mrocklin

I'd be interested in doing a comparison by hacking together a version that bypasses fsspec, and uses pyarrow's native S3FileSystem directly. Before that though, it might be good to get some baseline numbers on how fast we can pull the raw data from S3 (just as bytes), to understand what performance we can expect.

Note that you should already be able to do this by passing open_file_options={"open_file_func": <pyarrow-file-open-func>} to dd.read_parquet. For example:

import dask.dataframe as dd
import pyarrow as pa
import pyarrow.fs as pa_fs

path = "s3://ursa-labs-taxi-data/2009/01/data.parquet"
fs = pa_fs.S3FileSystem(anonymous=True)

ddf = dd.read_parquet(
    path,
    engine="pyarrow",
    storage_options={"anon": True},
    open_file_options={
        "open_file_func": fs.open_input_file,
    },
)

ddf.partitions[0].compute()

Using fs.open_input_file does cut my wall time by ~50% for this simple example.

rjzamora avatar Nov 03 '22 15:11 rjzamora

If we are talking about IO latency problems, then obviously the chunksize and caching strategy in the fsspec filelike will be very important, as pyarrow treats it like a regular file with many small reads. This is why we created fsspec.parquet to preemptively fetch all the bytes you will be needing before handing them to arrow.

martindurant avatar Nov 03 '22 15:11 martindurant

pyarrow treats it like a regular file with many small reads

Pyarrow actually doesn't do this anymore. They now expose a pre_buffer option in read_table (and other read functions) to do something similar to what fsspec.parquet does. Therefore the performance difference may really be python-overhead related.

rjzamora avatar Nov 03 '22 15:11 rjzamora

@rjzamora - worth testing! I wonder if they read the buffers concurrently.

martindurant avatar Nov 03 '22 15:11 martindurant

Using fs.open_input_file does cut my wall time by ~50% for this simple example

I'll be dumb for a moment. If there aren't any backend options specified, and if arrow is present, then should we switch to using Arrow by default for things like this?

mrocklin avatar Nov 03 '22 21:11 mrocklin

If there aren't any backend options specified, and if arrow is present, then should we switch to using Arrow by default for things like this?

The issue is that Dask has adopted fsspec as it's standard filesystem interface, and the fsspec API is not always aligned with the pyarrow.fs API. Therefore, the user would still need to pass in fsspec-based storage_options to read_parquet, and those options may be slightly different than the options needed to initialize an s3fs instance. This is why the code I shared above requires the user to create the pyarrow filesystem themselves.

Although, I guess you are explicitly asking "If there aren't any backend options specified". So, I suppose that case may work.

rjzamora avatar Nov 03 '22 21:11 rjzamora

If storage_options was empty could we safely do this?

On Thu, Nov 3, 2022 at 4:28 PM Richard (Rick) Zamora < @.***> wrote:

If there aren't any backend options specified, and if arrow is present, then should we switch to using Arrow by default for things like this?

The issue is that Dask has adopted fsspec as it's standard filesystem interface, and the fsspec API is not always aligned with the pyarrow.fs API. Therefore, the user would still need to pass in fsspec-based storage_options to read_parquet, and those options may be slightly different than the options needed to initialize an s3fs instance. This is why the code I shared above requires the user to create the pyarrow filesystem themselves.

— Reply to this email directly, view it on GitHub https://github.com/dask/dask/issues/9619#issuecomment-1302683606, or unsubscribe https://github.com/notifications/unsubscribe-auth/AACKZTAP7HDJIHWAEDS6B6DWGQU7FANCNFSM6AAAAAARWEBXU4 . You are receiving this because you authored the thread.Message ID: @.***>

mrocklin avatar Nov 03 '22 21:11 mrocklin

If storage_options was empty could we safely do this?

Yes. Sorry - I hit enter too early. We would still need to use fsspec for graph construction, but would be able to use pyarrow to open the file at data-reading time if storage_options was empty anyway.

Note that we could certainly add the option to use a pyarrow filesystem throught the read_parquet code (I've implemented this a few different times), but it's not a trivial change.

rjzamora avatar Nov 03 '22 22:11 rjzamora

A 50% speedup seems worth a non-trivial change to me?

@rjzamora thanks for sharing the snippet to do this. We should benchmark this on Coiled as well to get some more data points.

I wonder if there's some subset of commonly-used fsspec options that we could easily translate into arrow?

gjoseph92 avatar Nov 04 '22 00:11 gjoseph92

It's worth turning on logging

fsspec.utils.setup_logging(logger_name="s3fs")

creating dataframe:

2022-11-03 20:47:02,664 - s3fs - DEBUG - _lsdir -- Get directory listing page for ursa-labs-taxi-data/2009/01/data.parquet
2022-11-03 20:47:02,937 - s3fs - DEBUG - _call_s3 -- CALL: head_object - ({},) - {'Bucket': 'ursa-labs-taxi-data', 'Key': '2009/01/data.parquet'}
2022-11-03 20:47:02,993 - s3fs - DEBUG - _call_s3 -- CALL: head_object - ({},) - {'Bucket': 'ursa-labs-taxi-data', 'Key': '2009/01/data.parquet'}
2022-11-03 20:47:03,047 - s3fs - DEBUG - _fetch_range -- Fetch: ursa-labs-taxi-data/2009/01/data.parquet, 461900991-461966527
2022-11-03 20:47:03,048 - s3fs - DEBUG - _call_s3 -- CALL: get_object - () - {'Bucket': 'ursa-labs-taxi-data', 'Key': '2009/01/data.parquet', 'Range': 'bytes=461900991-461966526', 'IfMatch': '"880538d41446f7b8083573b44f0b8b37-27"'}
2022-11-03 20:47:03,162 - s3fs - DEBUG - _fetch_range -- Fetch: ursa-labs-taxi-data/2009/01/data.parquet, 461642441-461900991
2022-11-03 20:47:03,162 - s3fs - DEBUG - _call_s3 -- CALL: get_object - () - {'Bucket': 'ursa-labs-taxi-data', 'Key': '2009/01/data.parquet', 'Range': 'bytes=461642441-461900990', 'IfMatch': '"880538d41446f7b8083573b44f0b8b37-27"'}
2022-11-03 20:47:03,324 - s3fs - DEBUG - _call_s3 -- CALL: head_object - ({},) - {'Bucket': 'ursa-labs-taxi-data', 'Key': '2009/01/data.parquet'}
2022-11-03 20:47:03,384 - s3fs - DEBUG - _call_s3 -- CALL: head_object - ({},) - {'Bucket': 'ursa-labs-taxi-data', 'Key': '2009/01/data.parquet'}
2022-11-03 20:47:03,454 - s3fs - DEBUG - _fetch_range -- Fetch: ursa-labs-taxi-data/2009/01/data.parquet, 461900991-461966527
2022-11-03 20:47:03,455 - s3fs - DEBUG - _call_s3 -- CALL: get_object - () - {'Bucket': 'ursa-labs-taxi-data', 'Key': '2009/01/data.parquet', 'Range': 'bytes=461900991-461966526', 'IfMatch': '"880538d41446f7b8083573b44f0b8b37-27"'}
2022-11-03 20:47:03,524 - s3fs - DEBUG - _fetch_range -- Fetch: ursa-labs-taxi-data/2009/01/data.parquet, 461642441-461900991
2022-11-03 20:47:03,525 - s3fs - DEBUG - _call_s3 -- CALL: get_object - () - {'Bucket': 'ursa-labs-taxi-data', 'Key': '2009/01/data.parquet', 'Range': 'bytes=461642441-461900990', 'IfMatch': '"880538d41446f7b8083573b44f0b8b37-27"'}

(4 head calls, 4 get calls, all the same range)

and read

2022-11-03 20:48:11,558 - s3fs - DEBUG - _call_s3 -- CALL: head_object - ({},) - {'Bucket': 'ursa-labs-taxi-data', 'Key': '2009/01/data.parquet'}
2022-11-03 20:48:11,816 - s3fs - DEBUG - _fetch_range -- Fetch: ursa-labs-taxi-data/2009/01/data.parquet, 461900991-461966527
2022-11-03 20:48:11,817 - s3fs - DEBUG - _call_s3 -- CALL: get_object - () - {'Bucket': 'ursa-labs-taxi-data', 'Key': '2009/01/data.parquet', 'Range': 'bytes=461900991-461966526', 'IfMatch': '"880538d41446f7b8083573b44f0b8b37-27"'}
2022-11-03 20:48:11,922 - s3fs - DEBUG - _fetch_range -- Fetch: ursa-labs-taxi-data/2009/01/data.parquet, 461642441-461966519
2022-11-03 20:48:11,922 - s3fs - DEBUG - _call_s3 -- CALL: get_object - () - {'Bucket': 'ursa-labs-taxi-data', 'Key': '2009/01/data.parquet', 'Range': 'bytes=461642441-461966518', 'IfMatch': '"880538d41446f7b8083573b44f0b8b37-27"'}
2022-11-03 20:48:12,089 - s3fs - DEBUG - _fetch_range -- Fetch: ursa-labs-taxi-data/2009/01/data.parquet, 4-33347731
2022-11-03 20:48:12,090 - s3fs - DEBUG - _call_s3 -- CALL: get_object - () - {'Bucket': 'ursa-labs-taxi-data', 'Key': '2009/01/data.parquet', 'Range': 'bytes=4-33347730', 'IfMatch': '"880538d41446f7b8083573b44f0b8b37-27"'}
2022-11-03 20:48:14,017 - s3fs - DEBUG - _fetch_range -- Fetch: ursa-labs-taxi-data/2009/01/data.parquet, 266106048-299556487
2022-11-03 20:48:14,017 - s3fs - DEBUG - _call_s3 -- CALL: get_object - () - {'Bucket': 'ursa-labs-taxi-data', 'Key': '2009/01/data.parquet', 'Range': 'bytes=266106048-299556486', 'IfMatch': '"880538d41446f7b8083573b44f0b8b37-27"'}
2022-11-03 20:48:15,890 - s3fs - DEBUG - _fetch_range -- Fetch: ursa-labs-taxi-data/2009/01/data.parquet, 299556576-332631576
2022-11-03 20:48:15,891 - s3fs - DEBUG - _call_s3 -- CALL: get_object - () - {'Bucket': 'ursa-labs-taxi-data', 'Key': '2009/01/data.parquet', 'Range': 'bytes=299556576-332631575', 'IfMatch': '"880538d41446f7b8083573b44f0b8b37-27"'}
2022-11-03 20:48:17,532 - s3fs - DEBUG - _fetch_range -- Fetch: ursa-labs-taxi-data/2009/01/data.parquet, 332631643-366119909
2022-11-03 20:48:17,533 - s3fs - DEBUG - _call_s3 -- CALL: get_object - () - {'Bucket': 'ursa-labs-taxi-data', 'Key': '2009/01/data.parquet', 'Range': 'bytes=332631643-366119908', 'IfMatch': '"880538d41446f7b8083573b44f0b8b37-27"'}
2022-11-03 20:48:19,814 - s3fs - DEBUG - _fetch_range -- Fetch: ursa-labs-taxi-data/2009/01/data.parquet, 366119998-399192441
2022-11-03 20:48:19,815 - s3fs - DEBUG - _call_s3 -- CALL: get_object - () - {'Bucket': 'ursa-labs-taxi-data', 'Key': '2009/01/data.parquet', 'Range': 'bytes=366119998-399192440', 'IfMatch': '"880538d41446f7b8083573b44f0b8b37-27"'}
2022-11-03 20:48:21,581 - s3fs - DEBUG - _fetch_range -- Fetch: ursa-labs-taxi-data/2009/01/data.parquet, 399192508-432738114
2022-11-03 20:48:21,582 - s3fs - DEBUG - _call_s3 -- CALL: get_object - () - {'Bucket': 'ursa-labs-taxi-data', 'Key': '2009/01/data.parquet', 'Range': 'bytes=399192508-432738113', 'IfMatch': '"880538d41446f7b8083573b44f0b8b37-27"'}
2022-11-03 20:48:23,680 - s3fs - DEBUG - _fetch_range -- Fetch: ursa-labs-taxi-data/2009/01/data.parquet, 432738203-461642359
2022-11-03 20:48:23,680 - s3fs - DEBUG - _call_s3 -- CALL: get_object - () - {'Bucket': 'ursa-labs-taxi-data', 'Key': '2009/01/data.parquet', 'Range': 'bytes=432738203-461642358', 'IfMatch': '"880538d41446f7b8083573b44f0b8b37-27"'}
2022-11-03 20:48:25,287 - s3fs - DEBUG - _fetch_range -- Fetch: ursa-labs-taxi-data/2009/01/data.parquet, 166582521-199685105
2022-11-03 20:48:25,288 - s3fs - DEBUG - _call_s3 -- CALL: get_object - () - {'Bucket': 'ursa-labs-taxi-data', 'Key': '2009/01/data.parquet', 'Range': 'bytes=166582521-199685104', 'IfMatch': '"880538d41446f7b8083573b44f0b8b37-27"'}
2022-11-03 20:48:27,760 - s3fs - DEBUG - _fetch_range -- Fetch: ursa-labs-taxi-data/2009/01/data.parquet, 33347817-66443866
2022-11-03 20:48:27,762 - s3fs - DEBUG - _call_s3 -- CALL: get_object - () - {'Bucket': 'ursa-labs-taxi-data', 'Key': '2009/01/data.parquet', 'Range': 'bytes=33347817-66443865', 'IfMatch': '"880538d41446f7b8083573b44f0b8b37-27"'}
2022-11-03 20:48:29,462 - s3fs - DEBUG - _fetch_range -- Fetch: ursa-labs-taxi-data/2009/01/data.parquet, 99952366-133117918
2022-11-03 20:48:29,462 - s3fs - DEBUG - _call_s3 -- CALL: get_object - () - {'Bucket': 'ursa-labs-taxi-data', 'Key': '2009/01/data.parquet', 'Range': 'bytes=99952366-133117917', 'IfMatch': '"880538d41446f7b8083573b44f0b8b37-27"'}
2022-11-03 20:48:31,172 - s3fs - DEBUG - _fetch_range -- Fetch: ursa-labs-taxi-data/2009/01/data.parquet, 199685171-233041894
2022-11-03 20:48:31,173 - s3fs - DEBUG - _call_s3 -- CALL: get_object - () - {'Bucket': 'ursa-labs-taxi-data', 'Key': '2009/01/data.parquet', 'Range': 'bytes=199685171-233041893', 'IfMatch': '"880538d41446f7b8083573b44f0b8b37-27"'}
2022-11-03 20:48:32,837 - s3fs - DEBUG - _fetch_range -- Fetch: ursa-labs-taxi-data/2009/01/data.parquet, 66443929-99952280
2022-11-03 20:48:32,838 - s3fs - DEBUG - _call_s3 -- CALL: get_object - () - {'Bucket': 'ursa-labs-taxi-data', 'Key': '2009/01/data.parquet', 'Range': 'bytes=66443929-99952279', 'IfMatch': '"880538d41446f7b8083573b44f0b8b37-27"'}
2022-11-03 20:48:34,515 - s3fs - DEBUG - _fetch_range -- Fetch: ursa-labs-taxi-data/2009/01/data.parquet, 133117983-166582432
2022-11-03 20:48:34,517 - s3fs - DEBUG - _call_s3 -- CALL: get_object - () - {'Bucket': 'ursa-labs-taxi-data', 'Key': '2009/01/data.parquet', 'Range': 'bytes=133117983-166582431', 'IfMatch': '"880538d41446f7b8083573b44f0b8b37-27"'}
2022-11-03 20:48:36,159 - s3fs - DEBUG - _fetch_range -- Fetch: ursa-labs-taxi-data/2009/01/data.parquet, 233041983-266105981
2022-11-03 20:48:36,170 - s3fs - DEBUG - _call_s3 -- CALL: get_object - () - {'Bucket': 'ursa-labs-taxi-data', 'Key': '2009/01/data.parquet', 'Range': 'bytes=233041983-266105980', 'IfMatch': '"880538d41446f7b8083573b44f0b8b37-27"'}

(fetched 266MB in 16 serial, but unordered calls in 30s, including parse time, which is near my available bandwidth)

When loading with pyarrow's FS using the snippet above, I get

File ~/conda/envs/py39/lib/python3.9/site-packages/pyarrow/_fs.pyx:763, in pyarrow._fs.FileSystem.open_input_file()

File ~/conda/envs/py39/lib/python3.9/site-packages/pyarrow/error.pxi:144, in pyarrow.lib.pyarrow_internal_check_status()

File ~/conda/envs/py39/lib/python3.9/site-packages/pyarrow/error.pxi:115, in pyarrow.lib.check_status()

OSError: When reading information for key '2009/01/data.parquet' in bucket 'ursa-labs-taxi-data': AWS Error [code 100]: No response body.

The time to download the whole file with s3fs in a single continuous call is 27.7s.

fs = fsspec.filesystem("s3", anon=True)
%time fs.cat(path)
fsspec.parquet.open_parquet_file(path, storage_options={"anon": True}, engine="pyarrow")

takes 29s with s3fs, with two concurrent reads (35s if actually parsing the data)

Please run these on your machines closer to the data! Note that Rick's blog ( https://developer.nvidia.com/blog/optimizing-access-to-parquet-data-with-fsspec/ ) specifically measured s3fs with various caching versus arrow's FS versus fsspec.parquet. I would appreciate some deeper benchmarking and consideration before jumping to conclusions.

martindurant avatar Nov 04 '22 01:11 martindurant

Totally agreed that folks shouldn't jump to conclusions.

@martindurant have you had a chance to look at this video. I would be curious about your opinion on what is going on there. Also, if you have a chance to look at @gjoseph92 's pyspy profiles (I click "Left Heavy" and find that that helps with interpretability).

mrocklin avatar Nov 04 '22 01:11 mrocklin

If I had to guess (and I only have like 20% confidence here) it would be that while fsspec is good on its own and dask is good on its own there is some negative interaction when having both event loops doing their thing at once, and for some reason things get sticky. Just a total guess though. If that guess is anything near to correct though then I think we would want to evaluate things in the wild, using fsspec/arrow + dask + s3, ideally on cloud machines.

If you'd like your very own cloud machines I'm happy to set you up with a Coiled account. It's a pretty smooth experience today. I'm also happy to pay if folks don't already have AWS credentials through work.

mrocklin avatar Nov 04 '22 01:11 mrocklin

NB CLI curl took 22s to fetch the file from the HTTP endpoint.

martindurant avatar Nov 04 '22 02:11 martindurant

I will also do my own measurements n AWS, but certainly not today. I can't say there's too much rush. It may be worthwhile for someone to check out the tricks in https://boto3.amazonaws.com/v1/documentation/api/latest/guide/s3.html#using-the-transfer-manager to make things faster (although that may be for whole files only).

I would prefer if we can improve s3fs rather than splitting the FS backends.

martindurant avatar Nov 04 '22 13:11 martindurant

Feel free to see if this makes any difference

--- a/s3fs/core.py
+++ b/s3fs/core.py
@@ -2231,7 +2231,11 @@ def _fetch_range(fs, bucket, key, version_id, start, end, req_kw=None):
         )
         return b""
     logger.debug("Fetch: %s/%s, %s-%s", bucket, key, start, end)
-    resp = fs.call_s3(
+    return sync(fs.loop, _inner_fetch, fs, bucket, key, version_id, start, end, req_kw)
+
+
+async def _inner_fetch(fs, bucket, key, version_id, start, end, req_kw=None):
+    resp = await fs._call_s3(
         "get_object",
         Bucket=bucket,
         Key=key,
@@ -2239,4 +2243,4 @@ def _fetch_range(fs, bucket, key, version_id, start, end, req_kw=None):
         **version_id_kw(version_id),
         **req_kw,
     )
-    return sync(fs.loop, resp["Body"].read)
+    return await resp["Body"].read()

martindurant avatar Nov 04 '22 13:11 martindurant

I would prefer if we can improve s3fs rather than splitting the FS backends

Can you motivate this a bit more?

What are some of the downsides of using Arrow filesystems in specialized cases like above?

mrocklin avatar Nov 04 '22 13:11 mrocklin

To dask: some additional complexity and development, as well as possible confusion in users when multiple backends are used in conjunction. To the community: s3fs is used an awful lot without arrow, so improvements will be far reaching (arrow's install size on, for instance, AWS Lambda, is one of the main reasons for the continued existence of fastparquet); any specific features that might be needing development are much easier to achieve in python than arrow's C++ monolith.

martindurant avatar Nov 04 '22 13:11 martindurant

Sure, and I'm not currently planning to invest any development resources in improving arrow. However, if today Arrow's reading is better than s3fs's reading in a particular case, and if we know that we are in a situation where we can swap the two safely, then it seems to me like we should do so.

To be clear, I'm not saying "let's rip out S3" I'm saying "In this restricted but common case where we know it's safe, maybe we can automatically swap things and get a speed boost"

mrocklin avatar Nov 04 '22 13:11 mrocklin

You might also be underestimating the importance of this speed boost. We've just discovered a way for Dask to go 2x faster on very common workloads. This is HUGE for a lot of users.

mrocklin avatar Nov 04 '22 13:11 mrocklin

(if indeed, this speedup exists (which currently our benchmarking shows that it does in the wild, but we should do more homework))

mrocklin avatar Nov 04 '22 13:11 mrocklin

I would prefer if we can improve s3fs rather than splitting the FS backends.

We built everything on fsspec for good reason. It is flexible, effective, and has excellent API coverage. Therefore, fsspec/s3fs will clearly remain the primary code path in Dask-IO in the future. With that said, I do feel that the user should be allowed to set the global filesystem to an S3FileSystem object for the ”pyarrow” engine. There have been issues related to pyarrow users who want to pass the same arguments to Dask that they are already passing to pyarrow.dataset. Also, if we ultimately find that S3FileSystem has a consistent performance advantage (one that cannot be fixed), it would be pretty silly not to tweak default behavior.

rjzamora avatar Nov 04 '22 14:11 rjzamora

On ec2:

%time fs.get("s3://ursa-labs-taxi-data/2009/01/data.parquet", "data.parquet")
CPU times: user 1.16 s, sys: 641 ms, total: 1.8 s
Wall time: 4.67 s

> wget https://ursa-labs-taxi-data.s3.amazonaws.com/2009/01/data.parquet
461,966,527 96.0MB/s   in 4.6s

%time f = fsspec.parquet.open_parquet_file("s3://ursa-labs-taxi-data/2009/01/data.parquet", storage_options={"anon": True})
CPU times: user 1.36 s, sys: 833 ms, total: 2.2 s
Wall time: 3.46 s

%%time
with fs.open_input_file("ursa-labs-taxi-data/2009/01/data.parquet") as f:
    data = True
    while data:
        data = f.read(2**10)
AWS Error [code 100]: No response body  # also fails via dd.read_parquet as in Rick's snippet

%%time  # serial
with fsspec.open("s3://ursa-labs-taxi-data/2009/01/data.parquet", mode="rb", anon=True) as f:
    data = True
    while data:
        data = f.read(2**10)
CPU times: user 2.87 s, sys: 530 ms, total: 3.4 s
Wall time: 13.9 s

In [26]: print(_i23)
%%time  # serial, bigger blocks
with fsspec.open("s3://ursa-labs-taxi-data/2009/01/data.parquet", mode="rb", anon=True, default_block_size=50*2**20) as f:
    data = True
    while data:
        data = f.read(2**10)
CPU times: user 3.09 s, sys: 1.13 s, total: 4.22 s
Wall time: 7.63 s

> curl https://ursa-labs-taxi-data.s3.amazonaws.com/2009/01/data.parquet -o temp
4s

So s3fs matches wget and curl for throughput.

With dask in the loop, exactly the same

imoprt dask
import dask
import fsspec
fs = fsspec.filesystem("s3", anon=True)

@dask.delayed
def f():
    fs.get("s3://ursa-labs-taxi-data/2009/01/data.parquet", "data.parquet")
    return True

d = f()
%timeit dask.compute(d)
4.64 s ± 3.14 ms per loop

So all saturate at about 100MB/s, as expected for a .medium EC2 instance. Do I need something bigger? Why does pyarrow consistently fail?

pyarrow 8.0.0 fsspec, s3fs 2022.11.0 dask 2022.10.2 (fresh conda environment with conda-forge, no particular care taken)

martindurant avatar Nov 11 '22 21:11 martindurant

Why does pyarrow consistently fail?

I haven't seen any failures on pyarrow-9.0.0 (using S3FileSystem(anonymous=True)).

So all saturate at about 100MB/s, as expected for a .medium EC2 instance. Do I need something bigger?

My network is too noisy right now to run any useful experiments. However, it may be the case that the data-transfer time is indeed optimal (or close to it) for fsspec.read_parquet_file. If there is actually a consistent discrepancy in the read_parquet wall time (still not sure if there is), it may be related to something unrelated to data transfer.

rjzamora avatar Nov 14 '22 16:11 rjzamora

I haven't seen any failures on pyarrow-9.0.0

The env was created like

conda create -n new python=3.9 pyarrow dask s3fs ipython pandas -c conda-forge

under AWS linux from fresh miniconda, nothing special.

martindurant avatar Nov 14 '22 16:11 martindurant

My network is too noisy right now to run any useful experiments

@rjzamora sorry to pitch a for-profit thing, but I notice that you have a Coiled account. If you do the following you should get a Dask cluster with your local setup.

pip install coiled --upgrade # or conda, if you prefer

import coiled

cluster = coiled.Cluster(
    account="dask",   # use an AWS account that we've set up for development
    package_sync=True,  # copy versions of dask, arrow, etc. whatever you have locally
)

from dask.distributed import Client
client = Client(cluster)

No pressure of course, but I'd like to remove any hardware bottleneck you may have to do experimentation.

@martindurant , same offer to you if you're interested.

mrocklin avatar Nov 14 '22 16:11 mrocklin

I'm not sure it helps me, since I was doing interactive timings mostly without dask; and then with dask-threads. It's really a remote client I need, not workers.

Does anyone suspect that the effect might only happen on dask workers? That would be interesting.

martindurant avatar Nov 14 '22 17:11 martindurant

It certainly happens on Dask Workers. I don't know where the boundary is though. It's easy enough to experiment up there that it seems useful to have the ability.

If I wanted a remote machine I would probably spin up a single worker cluster and then start using afar.

Regardless though, the thing that I want to optimize isn't s3fs, it's s3fs+dask+cloud-machines. If I were trying to optimize this I would start with the full system, verify the slowdown, and then start pulling away pieces and seeing what happens.

On Mon, Nov 14, 2022 at 11:45 AM Martin Durant @.***> wrote:

I'm not sure it helps me, since I was doing interactive timings mostly without dask; and then with dask-threads. It's really a remote client I need, not workers.

Does anyone suspect that the effect might only happen on dask workers? That would be interesting.

— Reply to this email directly, view it on GitHub https://github.com/dask/dask/issues/9619#issuecomment-1314142599, or unsubscribe https://github.com/notifications/unsubscribe-auth/AACKZTHD6I3TEGYH2VXXTRTWIJ3C7ANCNFSM6AAAAAARWEBXU4 . You are receiving this because you authored the thread.Message ID: @.***>

mrocklin avatar Nov 14 '22 18:11 mrocklin

However, it may be the case that the data-transfer time is indeed optimal (or close to it) for fsspec.read_parquet_file. If there is actually a consistent discrepancy in the read_parquet wall time (still not sure if there is), it may be related to something unrelated to data transfer.

I'd expect plain read_parquet wall time to be pretty much optimal.

I think the problem here is about the interaction—particularly regarding concurrency—between s3fs and dask (and pyarrow?), as Matt is saying. s3fs seems perfectly fast on its own, but when you have multiple Arrow threads calling into single-threaded, GIL-locked Python to do their reads, which then call sync into a single-threaded event loop, which then call s3fs stuff, is that optimal?

As an example, please take a look at the py-spy profile I recorded of multi-threaded dask workers on a real cluster loading Parquet data: tls-10_0_0_177-42425.json

If you look at the fsspecIO thread (which is a different thread from the worker event loop MainThread, which itself is quite concerning—this implies there are two separate event loops running in separate threads), you can see that 30% of the time, the event loop is blocked acquiring the Python GIL:

Screen Shot 2022-11-14 at 11 04 48 AM

This is the convoy effect https://bugs.python.org/issue7946; you can read more here https://github.com/dask/distributed/issues/6325. It means that async networking performance will be significantly degraded if you're doing stuff in other threads that hold the GIL. In Dask, we're using async networking to load parquet, and we have other worker threads doing stuff that's likely to hold the GIL (plenty of pandas operations hold the GIL at least some of the time).

This is just an inherent limitation of Python right now, there's not much fsspec can do about it.

I'm not saying this is the only problem. I also would imagine (unfounded) that arrow can be more memory-efficient and reduce the number of copies if it to manage its own IO without leaving the C++ world. I'd hope it could read from network directly into an arrow-managed buffer—ideally zero-copy sendfile, but at least without a copy into non-Arrow userspace then getting copied again into Arrow-manged userspace.

Broadly, I just find it weird to see 4 Arrow threads all calling into Python to do their IO. I would expect, intuitively, that a performance and memory sensitive C++ library like Arrow can perform better if it gets to manage its own IO, also in C++.

gjoseph92 avatar Nov 14 '22 20:11 gjoseph92

I'm not sure it helps me, since I was doing interactive timings mostly without dask;

Yeah, just to be clear, what I'm interested in here isn't "the relative performance of s3fs vs pyarrow s3" it's the "relative performance of these libraries in the context of Dask".

What I'm seeing here (thanks @gjoseph92 for the profiling) is that s3fs is 2x slower in the context of Dask in a specific but very common use case. Unless we see an easy development path towards making this much faster (and @gjoseph92 's comment leads me to believe that such a path will be hard to find), I think that we should probably move in the direction of swapping things out when we know that it is safe to do so.

mrocklin avatar Nov 15 '22 14:11 mrocklin

quick addendum: "*at least in the short term"

mrocklin avatar Nov 15 '22 15:11 mrocklin