parquet-java
parquet-java copied to clipboard
PARQUET-1643 Use airlift codecs for LZ4, LZ0, GZIP
Neat! I was just poking around the codecs code so this is really interesting and timely.
I'm currently looking at how to run the parquet-benchmarks project... I'll see if I can get a clean run on master and your branch for LZ4 and GZIP to compare. (It looks like LZO benchmarks are disabled on master.)
Edit: There are no LZ4 benchmarks currently in the parquet-benchmarks module, and it looks like the run scripts need a bit of clean-up and attention! In the meantime, I managed a single, not very clean run of the WriteBenchmarks.write1MRowsDefaultBlockAndPageSizeGZIP
with and without the change. No improvement or regression noted!
Benchmark results with the patch applied
Benchmark Mode Cnt Score Error Units
ReadBenchmarks.read1MRowsBS256MPS4MUncompressed thrpt 25 0.947 ± 0.011 ops/s
ReadBenchmarks.read1MRowsBS256MPS8MUncompressed thrpt 25 0.952 ± 0.010 ops/s
ReadBenchmarks.read1MRowsBS512MPS4MUncompressed thrpt 25 0.938 ± 0.015 ops/s
ReadBenchmarks.read1MRowsBS512MPS8MUncompressed thrpt 25 0.960 ± 0.012 ops/s
ReadBenchmarks.read1MRowsDefaultBlockAndPageSizeGZIP thrpt 25 0.725 ± 0.007 ops/s
ReadBenchmarks.read1MRowsDefaultBlockAndPageSizeSNAPPY thrpt 25 0.902 ± 0.005 ops/s
ReadBenchmarks.read1MRowsDefaultBlockAndPageSizeUncompressed thrpt 25 0.940 ± 0.010 ops/s
PageChecksumReadBenchmarks.read100KRowsGzipWithVerification ss 5 0.502 ± 0.169 s/op
PageChecksumReadBenchmarks.read100KRowsGzipWithoutVerification ss 5 0.562 ± 0.299 s/op
PageChecksumReadBenchmarks.read100KRowsSnappyWithVerification ss 5 0.649 ± 0.975 s/op
PageChecksumReadBenchmarks.read100KRowsSnappyWithoutVerification ss 5 0.519 ± 0.095 s/op
PageChecksumReadBenchmarks.read100KRowsUncompressedWithVerification ss 5 0.531 ± 0.205 s/op
PageChecksumReadBenchmarks.read100KRowsUncompressedWithoutVerification ss 5 0.495 ± 0.182 s/op
PageChecksumReadBenchmarks.read10MRowsGzipWithVerification ss 5 13.505 ± 2.291 s/op
PageChecksumReadBenchmarks.read10MRowsGzipWithoutVerification ss 5 13.529 ± 2.485 s/op
PageChecksumReadBenchmarks.read10MRowsSnappyWithVerification ss 5 10.781 ± 1.075 s/op
PageChecksumReadBenchmarks.read10MRowsSnappyWithoutVerification ss 5 10.711 ± 1.377 s/op
PageChecksumReadBenchmarks.read10MRowsUncompressedWithVerification ss 5 10.822 ± 0.898 s/op
PageChecksumReadBenchmarks.read10MRowsUncompressedWithoutVerification ss 5 10.497 ± 0.961 s/op
PageChecksumReadBenchmarks.read1MRowsGzipWithVerification ss 5 1.946 ± 1.070 s/op
PageChecksumReadBenchmarks.read1MRowsGzipWithoutVerification ss 5 1.778 ± 0.684 s/op
PageChecksumReadBenchmarks.read1MRowsSnappyWithVerification ss 5 1.817 ± 1.941 s/op
PageChecksumReadBenchmarks.read1MRowsSnappyWithoutVerification ss 5 1.851 ± 1.808 s/op
PageChecksumReadBenchmarks.read1MRowsUncompressedWithVerification ss 5 1.570 ± 0.242 s/op
PageChecksumReadBenchmarks.read1MRowsUncompressedWithoutVerification ss 5 1.766 ± 1.573 s/op
Benchmark results on master branch:
Benchmark Mode Cnt Score Error Units
ReadBenchmarks.read1MRowsBS256MPS4MUncompressed thrpt 25 0.952 ± 0.008 ops/s
ReadBenchmarks.read1MRowsBS256MPS8MUncompressed thrpt 25 0.947 ± 0.008 ops/s
ReadBenchmarks.read1MRowsBS512MPS4MUncompressed thrpt 25 0.957 ± 0.010 ops/s
ReadBenchmarks.read1MRowsBS512MPS8MUncompressed thrpt 25 0.956 ± 0.009 ops/s
ReadBenchmarks.read1MRowsDefaultBlockAndPageSizeGZIP thrpt 25 0.731 ± 0.007 ops/s
ReadBenchmarks.read1MRowsDefaultBlockAndPageSizeSNAPPY thrpt 25 0.897 ± 0.008 ops/s
ReadBenchmarks.read1MRowsDefaultBlockAndPageSizeUncompressed thrpt 25 0.935 ± 0.013 ops/s
PageChecksumReadBenchmarks.read100KRowsGzipWithVerification ss 5 0.525 ± 0.079 s/op
PageChecksumReadBenchmarks.read100KRowsGzipWithoutVerification ss 5 0.483 ± 0.093 s/op
PageChecksumReadBenchmarks.read100KRowsSnappyWithVerification ss 5 0.545 ± 0.408 s/op
PageChecksumReadBenchmarks.read100KRowsSnappyWithoutVerification ss 5 0.517 ± 0.133 s/op
PageChecksumReadBenchmarks.read100KRowsUncompressedWithVerification ss 5 0.501 ± 0.213 s/op
PageChecksumReadBenchmarks.read100KRowsUncompressedWithoutVerification ss 5 0.506 ± 0.385 s/op
PageChecksumReadBenchmarks.read10MRowsGzipWithVerification ss 5 14.217 ± 10.173 s/op
PageChecksumReadBenchmarks.read10MRowsGzipWithoutVerification ss 5 13.189 ± 1.396 s/op
PageChecksumReadBenchmarks.read10MRowsSnappyWithVerification ss 5 11.369 ± 1.966 s/op
PageChecksumReadBenchmarks.read10MRowsSnappyWithoutVerification ss 5 10.964 ± 3.167 s/op
PageChecksumReadBenchmarks.read10MRowsUncompressedWithVerification ss 5 11.147 ± 2.056 s/op
PageChecksumReadBenchmarks.read10MRowsUncompressedWithoutVerification ss 5 10.554 ± 1.415 s/op
PageChecksumReadBenchmarks.read1MRowsGzipWithVerification ss 5 1.745 ± 0.482 s/op
PageChecksumReadBenchmarks.read1MRowsGzipWithoutVerification ss 5 1.788 ± 0.417 s/op
PageChecksumReadBenchmarks.read1MRowsSnappyWithVerification ss 5 1.935 ± 1.977 s/op
PageChecksumReadBenchmarks.read1MRowsSnappyWithoutVerification ss 5 1.505 ± 0.172 s/op
PageChecksumReadBenchmarks.read1MRowsUncompressedWithVerification ss 5 1.790 ± 1.657 s/op
PageChecksumReadBenchmarks.read1MRowsUncompressedWithoutVerification ss 5 1.751 ± 1.790 s/op
Benchmark Name | Master | Airlift Codecs |
---|---|---|
ReadBenchmarks.read1MRowsBS256MPS4MUncompressed | 0.952 | 0.947 |
ReadBenchmarks.read1MRowsBS256MPS8MUncompressed | 0.947 | 0.952 |
ReadBenchmarks.read1MRowsBS512MPS4MUncompressed | 0.957 | 0.938 |
ReadBenchmarks.read1MRowsBS512MPS8MUncompressed | 0.956 | 0.96 |
ReadBenchmarks.read1MRowsDefaultBlockAndPageSizeGZIP | 0.731 | 0.725 |
PageChecksumReadBenchmarks.read100KRowsGzipWithVerification | 0.525 | 0.502 |
PageChecksumReadBenchmarks.read100KRowsGzipWithoutVerification | 0.483 | 0.562 |
PageChecksumReadBenchmarks.read10MRowsGzipWithVerification | 14.217 | 13.505 |
PageChecksumReadBenchmarks.read10MRowsGzipWithoutVerification | 13.189 | 13.529 |
PageChecksumReadBenchmarks.read1MRowsGzipWithVerification | 1.745 | 1.946 |
PageChecksumReadBenchmarks.read1MRowsGzipWithoutVerification | 1.788 | 1.778 |
Pruned results for comparing GZIP perf. I don't see any significant speedup or regression.
Considering these compressors/decompressors don't use native resources, it would be cheap to create a compressor/decompressor for each page. This in turn allows for reading pages concurrently including implementing pre-fetching, removing the need to pool the de/compressor instances and making the overall code simpler.
@samarthjain why did you remove Snappy support?
@samarthjain why did you remove Snappy support?
@nandorKollar - it looks like Parquet has its own implementation for Snappy which from what I can tell doesn't depend on native. Also, adding snappy support for airliftcomrpessor was causing snappy tests to fail. So I dropped support for it. I have updated the PR title also to reflect the same.
@nandorKollar - I just pushed a commit to address changes you requested. Sorry for the delay. I had to punt working on this for various reasons.
@samarthjain thanks for addressing my comments, and sorry for the late reply. I have two additional question. I'm wondering if we might want to introduce a new configuration option to turn Airlift codecs on and off, in case something is wrong with Airlift, clients can still fall back to the original implementation. Not sure if it worths the effort, @gszadovszky what do you think?
I also noticed, that in other codecs we use org.apache.hadoop.io.compress.CodecPool
, should we consider using it for Airlift compressors too? We can address this in a separate ticket though.
Without reviewing this change and knowing too much about Airlift I would say the configuration might make sense. Meanwhile, the main purpose of using a pure java compression codec over the ones provided by Hadoop is to be independent from Hadoop. However, our code is hardly relying on Hadoop (the whole read/write is implemented in parquet-hadoop) the target is to make parquet-mr work without Hadoop and its dependencies. So, I would suggest introducing new features in a way that it does not depend on Hadoop or it would be easy to remove the Hadoop dependencies.
@nandorKollar - I am not exactly sure where I can add this configuration which I was thinking of naming as parquet.airlift.compressors.enable
We want both ParquetReadOptions
(with the config defined in ParquetInputFormat
) and ParquetRecordWriter
to be able to use the config for instantiating the correct (de)compressor. Does that mean we need separate compression related configs for read and write?
For compressor:
In ParquetRecordWriter
here:
https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordWriter.java#L150
For decompressor:
In ParquetReadOptions
here:
https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java#L302
so that the correct decompressor can be used by the ParquetFileReader
over here:
https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java#L1036
@samarthjain thanks for the work. I am looking to deploy zstd parquet into prod, but that requires new hadoop with native library support which is not practical in many prod use-cases.
Since airlift is pure Java implementation, what's the performance implications for zstd? I saw there is a benchmark for GZIP, but I don't see benchmark for other codecs.
Also, do we consider to use zstd-jin which is a Java library that packages native implementation of zstd for different platforms in jar?
Force pushed a new commit that makes it configurable whether to use Airlift based compressors or not. Also added tests and GZIP benchmarks for Airlift compressors. Benchmark results reveal that there are no performance improvements or regressions when using Airlift GZIP vs plain GZIP.
PageChecksumReadBenchmarks.read10MRowsAirliftGzipWithVerification 3 6.431 ± 0.741
PageChecksumReadBenchmarks.read10MRowsAirliftGzipWithoutVerification 3 6.605 ± 0.709
PageChecksumReadBenchmarks.read10MRowsGzipWithVerification 3 6.468 ± 0.700
PageChecksumReadBenchmarks.read10MRowsGzipWithoutVerification 3 6.583 ± 1.538
PageChecksumWriteBenchmarks.write10MRowsAirliftGzipWithChecksums 3 36.333 ± 0.510
PageChecksumWriteBenchmarks.write10MRowsAirliftGzipWithoutChecksums 3 36.069 ± 1.096
PageChecksumWriteBenchmarks.write10MRowsGzipWithChecksums 3 36.141 ± 1.095
PageChecksumWriteBenchmarks.write10MRowsGzipWithoutChecksums 3 36.174 ± 5.125
ReadBenchmarks.read1MRowsDefaultBlockAndPageSizeAirliftGZIP 3 0.898 ± 1.254
ReadBenchmarks.read1MRowsDefaultBlockAndPageSizeGZIP 3 0.891 ± 1.201
@dbtsai
Since airlift is pure Java implementation, what's the performance implications for zstd? I saw there is a benchmark for GZIP, but I don't see benchmark for other codecs. It looks like the zstd Airlift implementation doesn't implement the Hadoop APIs. It can be integrated within Parquet, but will take some work worth definitely worthy of another PR.
@nandorKollar, @rdblue, @danielcweeks - if you have cycles, could you please take a look at this PR.