datafusion
datafusion copied to clipboard
Blog post about parquet vs custom file formats
Is your feature request related to a problem or challenge?
https://x.com/andrewlamb1111/status/1925537738360504663
ClickBench keeps me convinced that Parquet can be quite fast. There is only a 2.3x performance difference vs @duckdb 's own format and unoptimized parquet: https://tinyurl.com/5aexvsfw. I am surprised that the (closed source) Umbra only reports 3.3x faster than DuckDB on parquet
Describe the solution you'd like
I would love to make a blog post about how much faster/slower custom file formats are compared to parquet. I am typing this ticket now that it is on my mind so I don't forget it.
The basic thesis is that
- Custom file formats only get you XX% more performance than parquet
- Many of the historic performance differences are due to engineering investment rather than format
- Parquet has many other benefits (like a very large ecosystem)
==> therefore parquet is the format that really matters
Describe alternatives you've considered
The core of the post would be to compare
- A propretary format (like duckdb/umbra)
- normal parquet
- "optimized parquet"
I think we could basically use the https://github.com/ClickHouse/ClickBench dataset and queries (and results from the proprietary systems)
The thing that is needed is to generate "optimized parquet" numbers.
The partitioned parquet files from ClickBench are not optimized. Specifically they:
- Are not sorted in any way
- Do not have a page index (Offset index)
- Use snappy compression
A fun experiment might be to "fix" the clickbench partitioned dataset by
- resorting and writing with page indexes (could use a bunch of DataFusion
COPYcommands pretty easily to do this). The sort order should be some subset of the predicate columns. Perhaps EventTime and then maybe SearchPhrase / URL. - disabling compression
Additional context
No response
Interestingly, Clickbench being quite a bit faster again for 1.3 (https://github.com/ClickHouse/ClickBench/pull/376 ) seems mostly related to using predicate pushdown more effectively during Parquet decoding (which they already might have implemented for their own format).
Interestingly, Clickbench being quite a bit faster again for 1.3 (ClickHouse/ClickBench#376 ) seems mostly related to using predicate pushdown more effectively during Parquet decoding (which they already might have implemented for their own format).
Indeed -- unsurprisingly the more effort that is put into parquet readers the faster they go 😆 and the open nature / wide spread adoption of the format makes it easier to gather that required effort.
BTW, I am working on the same for DataFusion with @zhuqi-lucas in https://github.com/apache/arrow-rs/issues/7456
I hope we will have some major improvements to share in another week or two
A fun experiment might be to "fix" the clickbench partitioned dataset by
resorting and writing with page indexes (could use a bunch of DataFusion COPY commands pretty easily to do this). The sort order should be some subset of the predicate columns. Perhaps EventTime and then maybe SearchPhrase / URL. disabling compression
This is very interesting, maybe we can also do this for arrow-rs clickbench benchmark to see the result.
A fun experiment might be to "fix" the clickbench partitioned dataset by
resorting and writing with page indexes (could use a bunch of DataFusion COPY commands pretty easily to do this). The sort order should be some subset of the predicate columns. Perhaps EventTime and then maybe SearchPhrase / URL. disabling compression
This is very interesting, maybe we can also do this for arrow-rs clickbench benchmark to see the result.
cc @alamb @Dandandan I do some experiment now, here is the result, mostly it will benefit from page_index with uncompressed parquet data:
critcmp mock_customer_format main
group main mock_customer_format
----- ---- --------------------
arrow_reader_clickbench/async/Q1 1.23 1025.0±96.12µs ? ?/sec 1.00 834.2±66.60µs ? ?/sec
arrow_reader_clickbench/async/Q10 1.00 20.5±5.75ms ? ?/sec 1.25 25.7±6.92ms ? ?/sec
arrow_reader_clickbench/async/Q11 1.00 22.8±4.31ms ? ?/sec 1.03 23.6±7.31ms ? ?/sec
arrow_reader_clickbench/async/Q12 1.86 29.7±2.88ms ? ?/sec 1.00 16.0±2.67ms ? ?/sec
arrow_reader_clickbench/async/Q13 1.74 36.9±2.59ms ? ?/sec 1.00 21.2±2.74ms ? ?/sec
arrow_reader_clickbench/async/Q14 1.33 37.2±1.88ms ? ?/sec 1.00 27.9±3.90ms ? ?/sec
arrow_reader_clickbench/async/Q19 1.00 3.9±0.45ms ? ?/sec 1.21 4.7±1.11ms ? ?/sec
arrow_reader_clickbench/async/Q20 2.59 90.6±3.54ms ? ?/sec 1.00 34.9±2.57ms ? ?/sec
arrow_reader_clickbench/async/Q21 2.95 117.2±5.50ms ? ?/sec 1.00 39.7±4.13ms ? ?/sec
arrow_reader_clickbench/async/Q22 3.88 219.8±3.14ms ? ?/sec 1.00 56.6±10.14ms ? ?/sec
arrow_reader_clickbench/async/Q23 1.64 215.7±3.89ms ? ?/sec 1.00 131.7±7.86ms ? ?/sec
arrow_reader_clickbench/async/Q24 1.80 40.7±3.22ms ? ?/sec 1.00 22.6±3.77ms ? ?/sec
arrow_reader_clickbench/async/Q27 2.82 98.8±4.16ms ? ?/sec 1.00 35.1±6.48ms ? ?/sec
arrow_reader_clickbench/async/Q28 2.37 95.6±3.30ms ? ?/sec 1.00 40.3±7.00ms ? ?/sec
arrow_reader_clickbench/async/Q30 1.28 43.8±4.08ms ? ?/sec 1.00 34.3±3.55ms ? ?/sec
arrow_reader_clickbench/async/Q36 2.35 96.6±4.54ms ? ?/sec 1.00 41.1±6.47ms ? ?/sec
arrow_reader_clickbench/async/Q37 1.43 58.4±8.72ms ? ?/sec 1.00 40.9±8.38ms ? ?/sec
arrow_reader_clickbench/async/Q38 1.02 22.6±1.55ms ? ?/sec 1.00 22.3±1.38ms ? ?/sec
arrow_reader_clickbench/async/Q39 1.11 24.7±0.66ms ? ?/sec 1.00 22.2±2.25ms ? ?/sec
arrow_reader_clickbench/async/Q40 1.22 28.7±1.41ms ? ?/sec 1.00 23.6±1.69ms ? ?/sec
arrow_reader_clickbench/async/Q41 1.03 21.9±1.36ms ? ?/sec 1.00 21.3±3.00ms ? ?/sec
arrow_reader_clickbench/async/Q42 1.00 12.3±1.00ms ? ?/sec 1.48 18.2±2.33ms ? ?/sec
arrow_reader_clickbench/sync/Q1 1.00 736.6±8.42µs ? ?/sec 1.46 1077.0±53.33µs ? ?/sec
arrow_reader_clickbench/sync/Q10 1.00 7.2±0.13ms ? ?/sec 1.06 7.7±0.24ms ? ?/sec
arrow_reader_clickbench/sync/Q11 1.00 8.3±0.12ms ? ?/sec 1.01 8.4±0.38ms ? ?/sec
arrow_reader_clickbench/sync/Q12 1.86 20.1±0.16ms ? ?/sec 1.00 10.9±0.10ms ? ?/sec
arrow_reader_clickbench/sync/Q13 1.61 25.3±0.25ms ? ?/sec 1.00 15.7±0.12ms ? ?/sec
arrow_reader_clickbench/sync/Q14 1.59 25.3±0.71ms ? ?/sec 1.00 15.8±0.22ms ? ?/sec
arrow_reader_clickbench/sync/Q19 1.00 1725.1±50.96µs ? ?/sec 1.02 1763.7±81.18µs ? ?/sec
arrow_reader_clickbench/sync/Q20 3.15 91.9±5.97ms ? ?/sec 1.00 29.2±0.84ms ? ?/sec
arrow_reader_clickbench/sync/Q21 3.09 120.7±7.03ms ? ?/sec 1.00 39.0±0.54ms ? ?/sec
arrow_reader_clickbench/sync/Q22 3.21 241.2±6.63ms ? ?/sec 1.00 75.1±1.85ms ? ?/sec
arrow_reader_clickbench/sync/Q23 2.53 227.6±9.16ms ? ?/sec 1.00 90.1±3.34ms ? ?/sec
arrow_reader_clickbench/sync/Q24 1.59 31.4±1.91ms ? ?/sec 1.00 19.8±1.51ms ? ?/sec
arrow_reader_clickbench/sync/Q27 3.35 98.0±3.44ms ? ?/sec 1.00 29.3±1.16ms ? ?/sec
arrow_reader_clickbench/sync/Q28 3.04 94.8±4.70ms ? ?/sec 1.00 31.2±1.53ms ? ?/sec
arrow_reader_clickbench/sync/Q30 1.20 34.8±2.27ms ? ?/sec 1.00 29.0±2.65ms ? ?/sec
arrow_reader_clickbench/sync/Q36 2.13 94.6±3.37ms ? ?/sec 1.00 44.4±2.13ms ? ?/sec
arrow_reader_clickbench/sync/Q37 1.28 53.9±2.37ms ? ?/sec 1.00 42.1±3.03ms ? ?/sec
arrow_reader_clickbench/sync/Q38 1.00 21.8±2.11ms ? ?/sec 1.10 23.9±0.40ms ? ?/sec
arrow_reader_clickbench/sync/Q39 1.17 21.5±2.43ms ? ?/sec 1.00 18.3±3.15ms ? ?/sec
arrow_reader_clickbench/sync/Q40 1.10 24.7±0.63ms ? ?/sec 1.00 22.5±0.75ms ? ?/sec
arrow_reader_clickbench/sync/Q41 1.08 19.9±1.21ms ? ?/sec 1.00 18.5±0.80ms ? ?/sec
arrow_reader_clickbench/sync/Q42 1.00 11.1±0.95ms ? ?/sec 1.57 17.5±0.50ms ? ?/sec
SET datafusion.execution.parquet.statistics_enabled = 'page';
SET datafusion.execution.parquet.compression = 'uncompressed';
CREATE EXTERNAL TABLE hits1
STORED AS PARQUET
LOCATION 'hits_1.parquet';
COPY (
SELECT *
FROM hits1
)
TO 'hits_output.parquet'
STORED AS PARQUET;
Do some changes in arrow-rs clickbench benchmark:
diff --git a/parquet/benches/arrow_reader_clickbench.rs b/parquet/benches/arrow_reader_clickbench.rs
index 69f3e4c1a9..5f9ae76ba2 100644
--- a/parquet/benches/arrow_reader_clickbench.rs
+++ b/parquet/benches/arrow_reader_clickbench.rs
@@ -584,17 +584,16 @@ fn hits_1() -> &'static Path {
current_dir
);
- let Some(hits_1_path) = find_file_if_exists(current_dir.clone(), "hits_1.parquet") else {
+ let Some(hits_output) = find_file_if_exists(current_dir.clone(), "hits_output.parquet") else {
eprintln!(
"Could not find hits_1.parquet in directory or parents: {:?}. Download it via",
current_dir
);
eprintln!();
- eprintln!("wget --continue https://datasets.clickhouse.com/hits_compatible/athena_partitioned/hits_1.parquet");
+ //eprintln!("wget --continue https://datasets.clickhouse.com/hits_compatible/athena_partitioned/hits_1.parquet");
panic!("Stopping");
};
-
- hits_1_path
+ hits_output
})
}
@@ -863,7 +862,7 @@ fn load_metadata(path: &Path) -> ArrowReaderMetadata {
//
// The clickbench_partitioned dataset has textual fields listed as
// binary for some historical reason so translate Binary to Utf8View
- if matches!(f.data_type(), DataType::Utf8 | DataType::Binary) {
+ if matches!(f.data_type(), DataType::Utf8 | DataType::Binary | DataType::BinaryView) {
let new_field = f.as_ref().clone().with_data_type(DataType::Utf8View);
Arc::new(new_field)
} else {
3x faster for Q21 is pretty neat to see
Do some changes in arrow-rs clickbench benchmark:
Do I understand that the changes you report are due to simply rewriting the parquet files to have a page index and be uncompressed?
Do some changes in arrow-rs clickbench benchmark:
Do I understand that the changes you report are due to simply rewriting the parquet files to have a page index and be uncompressed?
Yeah @alamb , this's the reason. Thanks.
I posted about this on twitter too in case anyone is interested: https://x.com/andrewlamb1111/status/1929852296323547273
Thank you @alamb!
Here is a related blog about doing this on clickhouse:
- https://altinity.com/blog/the-future-has-arrived-parquet-on-iceberg-finally-outperforms-mergetree
Hi @alamb @zhuqi-lucas, I recently read this issue, and it is very nice. Thanks.
I am also curious: Why would uncompressed Parquet be considered an optimization over Snappy-compressed Parquet? Is the decompression overhead of Snappy significant enough to slow down read performance?
[Update] I recall the BtrBlock paper Figure1 with S3 and Figure 8 in-memory that uncompressed parquet is always faster to be loaded on CPU: https://www.cs.cit.tum.de/fileadmin/w00cfj/dis/papers/btrblocks.pdf
I am also curious: Why would uncompressed Parquet be considered an optimization over Snappy-compressed Parquet? Is the decompression overhead of Snappy significant enough to slow down read performance?
Yes, exactly this -- once you have the data locally, the speed of block decompression like snappy often dominates the query performance. Of course using no decompression comes at a tradeoff of file size / more network required