community icon indicating copy to clipboard operation
community copied to clipboard

Bodo vs Dask comparison

Open jacobtomlinson opened this issue 10 months ago • 18 comments

I noticed that the folks over at Bodo published a blog post comparing the performance of Bodo, Dask, Spark and Modin + Ray.

Image

They published the benchmark code here, it would be interesting to try to reproduce and verify their results.

The Dask example uses dask-cloudprovider. I wonder what it would be like to use Coiled instead.

jacobtomlinson avatar Jan 30 '25 11:01 jacobtomlinson

Their instance type selection is our biggest foe here. Dask doesn't perform very well on these large instances. using more smaller instances with the same number of cores in aggregate would most likely perform a lot better

phofl avatar Jan 30 '25 11:01 phofl

So I ran this on Coiled and it's a lot faster with proper instances but the main problem is that their parquet files are not suited for distributed processing. They are huge and they don't really use row-groups. Splitting the row groups creates only 38 read tasks and some of them produce partitions with multiple GBs.

phofl avatar Jan 30 '25 12:01 phofl

Thanks for running this @phofl, I was just doing the same but you got there first.

I also tried their standard pandas version, but I'm getting pyarrow.lib.ArrowNotImplementedError: Unsupported cast from string to null using function cast_null when trying to load the parquet files.

jacobtomlinson avatar Jan 30 '25 12:01 jacobtomlinson

Any chance you have an older arrow version installed?

phofl avatar Jan 30 '25 12:01 phofl

I just created a fresh RAPIDS environment so I have arrow==1.3.0 and pyarrow==17.0.0.

jacobtomlinson avatar Jan 30 '25 12:01 jacobtomlinson

Ok, that's odd then...

I ran this with the dataset that we are hosting in a coiled s3 bucket, i.e.

dataset = "s3://coiled-datasets/uber-lyft-tlc/"

and that one finished the query in 27 seconds (minus the download of the computed results, I interrupted that because my wifi isn't that fast). They are running the query on a cloud machine, so it shouldn't matter much

phofl avatar Jan 30 '25 12:01 phofl

parquet performance will also depend on the backend used. the pyarrow backend is / should be faster but it has still a lot of sharp edges and isn't the default

fjetter avatar Jan 30 '25 12:01 fjetter

parquet performance will also depend on the backend used. the pyarrow backend is / should be faster but it has still a lot of sharp edges and isn't the default

it technically is, but our fusing is pretty aggressive which means we only end up with 80 partitions since we only need 4 columns from the dataset when using the pyarrow backend but they are running on 128 cores... The cluster is overprovisioned by quite a bit

phofl avatar Jan 30 '25 12:01 phofl

Well, they are running on a r6i.16xlarge machine which has 64 cores. This can't work 😂 The GIL will kill everything. We have moderate if not severe GIL issues already at 8 cores. 64 is impossible to run anything meaningfully. Even IO will be impossibly slow because it is using fastparquet by default, i.e. another python library that requires the GIL.

fjetter avatar Jan 30 '25 12:01 fjetter

They are submitting a task that is internally calling compute. This will be using a worker client, i.e. the driver code is also a worker task. I'm not entirely sure how that'll impact performance but it is uncommon.

I just noticed that we do not have any good advice on https://docs.dask.org/en/stable/best-practices.html about instance sizes (see https://github.com/dask/dask/pull/11705). There is a comment about avoiding thread counts above 10 but I think the recommendation should be lower. This docs section is not truly dealing with a distributed cluster.

fjetter avatar Jan 30 '25 12:01 fjetter

fastparquet by default, i.e. another python library that requires the GIL.

fastparquet releases the GIL in its core decoding algorithms FWIW

I would suggest that the get_monthly_travels_weather function is pretty terribly written, you could do much better with some map_partitions. Yes, the operations should be fused, but there are LOADS of pandas temporaries in there. You could probably do it all in a little numba func, if anyone wants to try. I say this, because the main selling point on bodo is its JIT, apparently.

martindurant avatar Feb 13 '25 20:02 martindurant

Hello! I wrote this code and wanted to provide some additional context that might be helpful to this discussion. First of all thank you for taking the time to check out our benchmark and reproduce the results! This investigation was really informative and I find myself learning a lot from reading it.

Firstly, the goal of this benchmark was to demonstrate how a typical Pandas user might write a simple workflow and then try to scale it (and doing so without drastically rewriting their code).

Regarding the parquet files, they were downloaded directly from here without any modification. It's important to note here that each system had to deal with the same large parquet files.

Regarding the instance size, this was primarily done to increase the amount of available memory since smaller instance sizes ran out of memory fairly quickly. I wasn't aware of Dask issues with large instances. Looking at the Dask distributed dashboard, looks like there is a task that requires all data on a single worker. Any ideas?

scott-routledge2 avatar Feb 13 '25 22:02 scott-routledge2

Looking at the Dask distributed dashboard, looks like there is a task that requires all data on a single worker. Any ideas?

It looks like you call .compute() on the final result, whereas other libraries write the results to S3 in Parquet format. Calling .compute() has multiple issues that will affect runtime and the memory footprint:

  1. It will concatenate the entire result into a single partition, which might be what you see with the single task requiring all data.
  2. It will then transfer the entire result back to the client, which can take a while, depending on your connection and location.

I'd recommend writing to S3 to ensure this benchmark does not compare apples to oranges.

hendrikmakait avatar Feb 19 '25 14:02 hendrikmakait

t will concatenate the entire result into a single partition

This is not generally true! The concatenation should happen in the client.

martindurant avatar Feb 19 '25 14:02 martindurant

This is not generally true! The concatenation should happen in the client.

It's true for dataframes, which is the API that is used here. See https://github.com/dask/dask-expr/pull/1138 which introduces a flag to disable that behavior.

hendrikmakait avatar Feb 19 '25 14:02 hendrikmakait

It's true for dataframes

I am truly astonished.

Unfortunately, the PR has no discussion or documentation on when the user should use the new optional flag.

martindurant avatar Feb 19 '25 14:02 martindurant

Thanks. I opened a PR to update the code to write to S3 instead of compute here. I didn't notice a significant difference in this case, does the compute=True flag in to_parquet also transfer some result to the client?

scott-routledge2 avatar Feb 19 '25 16:02 scott-routledge2

does the compute=True flag in to_parquet also transfer some result to the client

No. You did the default and correct thing. The difference is, whether the operation waits for completion, or returns background-monitored futures.

martindurant avatar Feb 20 '25 19:02 martindurant