community
community copied to clipboard
Bodo vs Dask comparison
I noticed that the folks over at Bodo published a blog post comparing the performance of Bodo, Dask, Spark and Modin + Ray.
They published the benchmark code here, it would be interesting to try to reproduce and verify their results.
The Dask example uses dask-cloudprovider. I wonder what it would be like to use Coiled instead.
Their instance type selection is our biggest foe here. Dask doesn't perform very well on these large instances. using more smaller instances with the same number of cores in aggregate would most likely perform a lot better
So I ran this on Coiled and it's a lot faster with proper instances but the main problem is that their parquet files are not suited for distributed processing. They are huge and they don't really use row-groups. Splitting the row groups creates only 38 read tasks and some of them produce partitions with multiple GBs.
Thanks for running this @phofl, I was just doing the same but you got there first.
I also tried their standard pandas version, but I'm getting pyarrow.lib.ArrowNotImplementedError: Unsupported cast from string to null using function cast_null when trying to load the parquet files.
Any chance you have an older arrow version installed?
I just created a fresh RAPIDS environment so I have arrow==1.3.0 and pyarrow==17.0.0.
Ok, that's odd then...
I ran this with the dataset that we are hosting in a coiled s3 bucket, i.e.
dataset = "s3://coiled-datasets/uber-lyft-tlc/"
and that one finished the query in 27 seconds (minus the download of the computed results, I interrupted that because my wifi isn't that fast). They are running the query on a cloud machine, so it shouldn't matter much
parquet performance will also depend on the backend used. the pyarrow backend is / should be faster but it has still a lot of sharp edges and isn't the default
parquet performance will also depend on the backend used. the pyarrow backend is / should be faster but it has still a lot of sharp edges and isn't the default
it technically is, but our fusing is pretty aggressive which means we only end up with 80 partitions since we only need 4 columns from the dataset when using the pyarrow backend but they are running on 128 cores... The cluster is overprovisioned by quite a bit
Well, they are running on a r6i.16xlarge machine which has 64 cores. This can't work 😂 The GIL will kill everything. We have moderate if not severe GIL issues already at 8 cores. 64 is impossible to run anything meaningfully. Even IO will be impossibly slow because it is using fastparquet by default, i.e. another python library that requires the GIL.
They are submitting a task that is internally calling compute. This will be using a worker client, i.e. the driver code is also a worker task. I'm not entirely sure how that'll impact performance but it is uncommon.
I just noticed that we do not have any good advice on https://docs.dask.org/en/stable/best-practices.html about instance sizes (see https://github.com/dask/dask/pull/11705). There is a comment about avoiding thread counts above 10 but I think the recommendation should be lower. This docs section is not truly dealing with a distributed cluster.
fastparquet by default, i.e. another python library that requires the GIL.
fastparquet releases the GIL in its core decoding algorithms FWIW
I would suggest that the get_monthly_travels_weather function is pretty terribly written, you could do much better with some map_partitions. Yes, the operations should be fused, but there are LOADS of pandas temporaries in there. You could probably do it all in a little numba func, if anyone wants to try. I say this, because the main selling point on bodo is its JIT, apparently.
Hello! I wrote this code and wanted to provide some additional context that might be helpful to this discussion. First of all thank you for taking the time to check out our benchmark and reproduce the results! This investigation was really informative and I find myself learning a lot from reading it.
Firstly, the goal of this benchmark was to demonstrate how a typical Pandas user might write a simple workflow and then try to scale it (and doing so without drastically rewriting their code).
Regarding the parquet files, they were downloaded directly from here without any modification. It's important to note here that each system had to deal with the same large parquet files.
Regarding the instance size, this was primarily done to increase the amount of available memory since smaller instance sizes ran out of memory fairly quickly. I wasn't aware of Dask issues with large instances. Looking at the Dask distributed dashboard, looks like there is a task that requires all data on a single worker. Any ideas?
Looking at the Dask distributed dashboard, looks like there is a task that requires all data on a single worker. Any ideas?
It looks like you call .compute() on the final result, whereas other libraries write the results to S3 in Parquet format. Calling .compute() has multiple issues that will affect runtime and the memory footprint:
- It will concatenate the entire result into a single partition, which might be what you see with the single task requiring all data.
- It will then transfer the entire result back to the client, which can take a while, depending on your connection and location.
I'd recommend writing to S3 to ensure this benchmark does not compare apples to oranges.
t will concatenate the entire result into a single partition
This is not generally true! The concatenation should happen in the client.
This is not generally true! The concatenation should happen in the client.
It's true for dataframes, which is the API that is used here. See https://github.com/dask/dask-expr/pull/1138 which introduces a flag to disable that behavior.
It's true for dataframes
I am truly astonished.
Unfortunately, the PR has no discussion or documentation on when the user should use the new optional flag.
Thanks. I opened a PR to update the code to write to S3 instead of compute here. I didn't notice a significant difference in this case, does the compute=True flag in to_parquet also transfer some result to the client?
does the compute=True flag in to_parquet also transfer some result to the client
No. You did the default and correct thing. The difference is, whether the operation waits for completion, or returns background-monitored futures.