[WIP] A new parquet reader that supports filter push down, which improves the total time on clickbench by 40+% compared to arrow parquet reader
I have seen many PRs that do optimization on parquet. Here I share a redesigned parquet reader that can currently achieve more than 40% performance improvement on clickbench.
There are still some features to be completed before it can be used in practice, including supporting more types, including complex types, and more complete expression pushdown.
Below are the performance test results on my development machine (12900k, 128GB memory)
cold run
| query no | tries | arrow | new reader | new reader/ arrow |
|---|---|---|---|---|
| 1 | 1 | 0.003 | 0.005 | 1.666666667 |
| 2 | 1 | 0.091 | 0.091 | 1 |
| 3 | 1 | 0.125 | 0.122 | 0.976 |
| 4 | 1 | 0.174 | 0.195 | 1.120689655 |
| 5 | 1 | 0.857 | 0.809 | 0.943990665 |
| 6 | 1 | 0.917 | 0.756 | 0.824427481 |
| 7 | 1 | 0.084 | 0.099 | 1.178571429 |
| 8 | 1 | 0.081 | 0.082 | 1.012345679 |
| 9 | 1 | 1.2 | 1.083 | 0.9025 |
| 10 | 1 | 1.377 | 1.117 | 0.811183733 |
| 11 | 1 | 0.545 | 0.317 | 0.581651376 |
| 12 | 1 | 0.536 | 0.303 | 0.565298507 |
| 13 | 1 | 1.513 | 1.278 | 0.844679445 |
| 14 | 1 | 2.174 | 1.701 | 0.782428703 |
| 15 | 1 | 1.531 | 1.2 | 0.783801437 |
| 16 | 1 | 1.416 | 1.289 | 0.910310734 |
| 17 | 1 | 3.617 | 3.553 | 0.982305778 |
| 18 | 1 | 2.081 | 1.846 | 0.887073522 |
| 19 | 1 | 8.503 | 8.071 | 0.949194402 |
| 20 | 1 | 0.192 | 0.194 | 1.010416667 |
| 21 | 1 | 5.349 | 1.801 | 0.336698448 |
| 22 | 1 | 5.731 | 1.294 | 0.225789566 |
| 23 | 1 | 10.694 | 2.954 | 0.276229661 |
| 24 | 1 | 27.219 | 9.326 | 0.342628311 |
| 25 | 1 | 1.14 | 0.686 | 0.601754386 |
| 26 | 1 | 0.697 | 0.455 | 0.652797704 |
| 27 | 1 | 1.131 | 0.62 | 0.548187445 |
| 28 | 1 | 5.745 | 1.967 | 0.342384682 |
| 29 | 1 | 8.296 | 6.542 | 0.788572806 |
| 30 | 1 | 0.101 | 0.128 | 1.267326733 |
| 31 | 1 | 1.368 | 1.13 | 0.826023392 |
| 32 | 1 | 2.336 | 1.841 | 0.788099315 |
| 33 | 1 | 8.479 | 8.962 | 1.056964265 |
| 34 | 1 | 9.468 | 7.005 | 0.739860583 |
| 35 | 1 | 9.692 | 7.399 | 0.763413124 |
| 36 | 1 | 1.052 | 1.073 | 1.019961977 |
| 37 | 1 | 0.185 | 0.156 | 0.843243243 |
| 38 | 1 | 0.132 | 0.117 | 0.886363636 |
| 39 | 1 | 0.162 | 0.118 | 0.728395062 |
| 40 | 1 | 0.325 | 0.185 | 0.569230769 |
| 41 | 1 | 0.068 | 0.083 | 1.220588235 |
| 42 | 1 | 0.061 | 0.085 | 1.393442623 |
| 43 | 1 | 0.058 | 0.078 | 1.344827586 |
| sum | 126.5 | 78.1 | 0.617 |
hot run
| query no | tries | arrow | new reader | new reader/ arrow |
|---|---|---|---|---|
| 1 | 2 | 0.003 | 0.005 | 1.666666667 |
| 2 | 2 | 0.08 | 0.085 | 1.0625 |
| 3 | 2 | 0.113 | 0.107 | 0.946902655 |
| 4 | 2 | 0.161 | 0.182 | 1.130434783 |
| 5 | 2 | 0.828 | 0.777 | 0.938405797 |
| 6 | 2 | 0.918 | 0.73 | 0.795206972 |
| 7 | 2 | 0.077 | 0.096 | 1.246753247 |
| 8 | 2 | 0.078 | 0.084 | 1.076923077 |
| 9 | 2 | 1.154 | 0.992 | 0.859618718 |
| 10 | 2 | 1.318 | 1.07 | 0.811836115 |
| 11 | 2 | 0.44 | 0.287 | 0.652272727 |
| 12 | 2 | 0.501 | 0.268 | 0.53493014 |
| 13 | 2 | 1.474 | 1.088 | 0.738127544 |
| 14 | 2 | 2.133 | 1.653 | 0.774964838 |
| 15 | 2 | 1.508 | 1.096 | 0.726790451 |
| 16 | 2 | 1.162 | 1.085 | 0.93373494 |
| 17 | 2 | 3.515 | 3.219 | 0.915789474 |
| 18 | 2 | 2.033 | 1.75 | 0.860796852 |
| 19 | 2 | 6.809 | 6.214 | 0.912615656 |
| 20 | 2 | 0.16 | 0.13 | 0.8125 |
| 21 | 2 | 4.76 | 1.526 | 0.320588235 |
| 22 | 2 | 5.304 | 0.998 | 0.188159879 |
| 23 | 2 | 10.077 | 2.499 | 0.247990473 |
| 24 | 2 | 26.25 | 8.131 | 0.309752381 |
| 25 | 2 | 1.092 | 0.508 | 0.465201465 |
| 26 | 2 | 0.658 | 0.361 | 0.548632219 |
| 27 | 2 | 1.034 | 0.498 | 0.481624758 |
| 28 | 2 | 5.533 | 1.666 | 0.301102476 |
| 29 | 2 | 7.815 | 6.055 | 0.774792067 |
| 30 | 2 | 0.1 | 0.109 | 1.09 |
| 31 | 2 | 1.246 | 0.983 | 0.788924559 |
| 32 | 2 | 2 | 1.501 | 0.7505 |
| 33 | 2 | 8.58 | 8.328 | 0.970629371 |
| 34 | 2 | 9.314 | 6.719 | 0.721387159 |
| 35 | 2 | 9.431 | 7.093 | 0.752094158 |
| 36 | 2 | 0.873 | 0.856 | 0.980526919 |
| 37 | 2 | 0.163 | 0.14 | 0.858895706 |
| 38 | 2 | 0.114 | 0.102 | 0.894736842 |
| 39 | 2 | 0.155 | 0.103 | 0.664516129 |
| 40 | 2 | 0.28 | 0.185 | 0.660714286 |
| 41 | 2 | 0.062 | 0.087 | 1.403225806 |
| 42 | 2 | 0.053 | 0.081 | 1.528301887 |
| 43 | 2 | 0.052 | 0.074 | 1.423076923 |
| sum | 119.4 | 69.5 | 0.582 |
Changelog category (leave one):
- Experimental Feature
Changelog entry (a user-readable short description of the changes that goes to CHANGELOG.md):
...
Documentation entry for user-facing changes
- [ ] Documentation is written (mandatory for new features)
Information about CI checks: https://clickhouse.com/docs/en/development/continuous-integration/
CI Settings (Only check the boxes if you know what you are doing):
- [ ] Allow: All Required Checks
- [ ] Allow: Stateless tests
- [ ] Allow: Stateful tests
- [ ] Allow: Integration Tests
- [ ] Allow: Performance tests
- [ ] Allow: All Builds
- [ ] Allow: batch 1, 2 for multi-batch jobs
- [ ] Allow: batch 3, 4, 5, 6 for multi-batch jobs
- [ ] Exclude: Style check
- [ ] Exclude: Fast test
- [ ] Exclude: All with ASAN
- [ ] Exclude: All with TSAN, MSAN, UBSAN, Coverage
- [ ] Exclude: All with aarch64, release, debug
- [ ] Run only fuzzers related jobs (libFuzzer fuzzers, AST fuzzers, etc.)
- [ ] Exclude: AST fuzzers
- [ ] Do not test
- [ ] Woolen Wolfdog
- [ ] Upload binaries for special builds
- [ ] Disable merge-commit
- [ ] Disable CI cache
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you all sign our Contributor License Agreement before we can accept your contribution.
1 out of 2 committers have signed the CLA.
:white_check_mark: yxheartipp
:x: liuneng1994
liuneng1994 seems not to be a GitHub user. You need a GitHub account to be able to sign the CLA. If you have already a GitHub account, please add the email address used for this commit to your account.
You have signed the CLA already but the status is still pending? Let us recheck it.
@liuneng1994 I assume this is supposed to replace the existing "native reader implementation". If so, do you plan to incorporate all of its functionality, including the ones that are introduced while your PR is on draft/ on review? For example: https://github.com/ClickHouse/ClickHouse/pull/70807.
I want to help achieve feature completeness on native reader, but I am afraid all the work will be thrown away once this PR gets merged. Do you see a way we could collaborate?
@liuneng1994 I assume this is supposed to replace the existing "native reader implementation". If so, do you plan to incorporate all of its functionality, including the ones that are introduced while your PR is on draft/ on review? For example: #70807.
I want to help achieve feature completeness on native reader, but I am afraid all the work will be thrown away once this PR gets merged. Do you see a way we could collaborate?
I started this work in July, and I didn't realize there were so many PRs related to parquet, which was beyond my expectation. Of course, I hope that when this PR is ready for merging, it can include existing features, but this will add a lot of work.
I am very happy to have someone to work with to implement a native parquet reader. After referring to the parquet readers of other engines (starrocks, velox, duckdb, etc.), I also realized that it is indeed a lot of work to make an industrially available parquet reader.
If you don't mind, we can work together in my fork repository. You can exchange ideas at https://github.com/ClickHouse/ClickHouse/issues/68641, or use other communication tools (telegram or others). The current code is synchronized in real time in this PR. If you think my implementation is worth collaborating on, I am willing to take the time to write a detailed document to explain my design ideas and subsequent directions.
@liuneng1994 I assume this is supposed to replace the existing "native reader implementation". If so, do you plan to incorporate all of its functionality, including the ones that are introduced while your PR is on draft/ on review? For example: #70807. I want to help achieve feature completeness on native reader, but I am afraid all the work will be thrown away once this PR gets merged. Do you see a way we could collaborate?
I started this work in July, and I didn't realize there were so many PRs related to parquet, which was beyond my expectation. Of course, I hope that when this PR is ready for merging, it can include existing features, but this will add a lot of work.
I am very happy to have someone to work with to implement a native parquet reader. After referring to the parquet readers of other engines (starrocks, velox, duckdb, etc.), I also realized that it is indeed a lot of work to make an industrially available parquet reader.
If you don't mind, we can work together in my fork repository. You can exchange ideas at #68641, or use other communication tools (telegram or others). The current code is synchronized in real time in this PR. If you think my implementation is worth collaborating on, I am willing to take the time to write a detailed document to explain my design ideas and subsequent directions.
Working together towards better parquet support sounds like a good idea. Do you mind sharing your telegram handle? Mine is @arthurpassoscs
@liuneng1994 I assume this is supposed to replace the existing "native reader implementation". If so, do you plan to incorporate all of its functionality, including the ones that are introduced while your PR is on draft/ on review? For example: #70807. I want to help achieve feature completeness on native reader, but I am afraid all the work will be thrown away once this PR gets merged. Do you see a way we could collaborate?
I started this work in July, and I didn't realize there were so many PRs related to parquet, which was beyond my expectation. Of course, I hope that when this PR is ready for merging, it can include existing features, but this will add a lot of work. I am very happy to have someone to work with to implement a native parquet reader. After referring to the parquet readers of other engines (starrocks, velox, duckdb, etc.), I also realized that it is indeed a lot of work to make an industrially available parquet reader. If you don't mind, we can work together in my fork repository. You can exchange ideas at #68641, or use other communication tools (telegram or others). The current code is synchronized in real time in this PR. If you think my implementation is worth collaborating on, I am willing to take the time to write a detailed document to explain my design ideas and subsequent directions.
Working together towards better parquet support sounds like a good idea. Do you mind sharing your telegram handle? Mine is @arthurpassoscs
My telegram is @liuneng1994, I have sent you a message. I will take the time to describe the current progress and future plans.
I attempted to test this implementation of the native reader on some of my Parquet files but encountered an issue where the ClickHouse server crashes at some point while reading from the Parquet file. Below is the complete ClickHouse log.
I attempted to test this implementation of the native reader on some of my Parquet files but encountered an issue where the ClickHouse server crashes at some point while reading from the Parquet file. Below is the complete ClickHouse log.
Thank you for your test. If convenient, you can provide a way to reproduce it. My previous work was mainly on verifying the feasibility of performance. The test cases were limited, mainly focusing on the correctness of clickbench. Later, I will cover more test cases in arrow parquet test.
Have you compared the performance of your parquet reader vs clickhouse existing native reader?
Have you compared the performance of your parquet reader vs clickhouse existing native reader?
I haven't compared it. I compared this reader with mergetree. Because the existing native reader does not support filter push down, there is not much comparability. Filter push down is crucial for benchmarks such as clickbench.
Off topic, I am still working on supporting nested types and ut coverage. The complexity is a bit beyond my expectations. If everything goes well, I should be able to have a version of a certain quality in a week or two.
performance compare with arrow and mergetree, version 20241223 mergetree use data without sorting
Hot Run
| query no | tries | mergetree | arrow | PAR241223 | 241223/ARROW | 241223/MERGETREE | |
|---|---|---|---|---|---|---|---|
| 1 | 2 | 0.003 | 0.003 | 0.021 | 7 | 7 | |
| 2 | 2 | 0.009 | 0.08 | 0.084 | 1.05 | 9.333333333 | |
| 3 | 2 | 0.017 | 0.113 | 0.102 | 0.902654867 | 6 | |
| 4 | 2 | 0.025 | 0.161 | 0.15 | 0.931677019 | 6 | |
| 5 | 2 | 0.594 | 0.828 | 0.661 | 0.798309179 | 1.112794613 | |
| 6 | 2 | 0.497 | 0.918 | 0.645 | 0.702614379 | 1.29778672 | |
| 7 | 2 | 0.012 | 0.077 | 0.108 | 1.402597403 | 9 | |
| 8 | 2 | 0.011 | 0.078 | 0.083 | 1.064102564 | 7.545454545 | |
| 9 | 2 | 0.749 | 1.154 | 0.852 | 0.73830156 | 1.137516689 | |
| 10 | 2 | 0.828 | 1.318 | 0.913 | 0.692716237 | 1.102657005 | |
| 11 | 2 | 0.139 | 0.44 | 0.251 | 0.570454545 | 1.805755396 | |
| 12 | 2 | 0.173 | 0.501 | 0.246 | 0.491017964 | 1.421965318 | |
| 13 | 2 | 0.945 | 1.474 | 0.884 | 0.59972863 | 0.935449735 | |
| 14 | 2 | 1.367 | 2.133 | 1.281 | 0.600562588 | 0.937088515 | |
| 15 | 2 | 0.948 | 1.508 | 0.882 | 0.584880637 | 0.930379747 | |
| 16 | 2 | 0.908 | 1.162 | 0.873 | 0.751290878 | 0.961453744 | |
| 17 | 2 | 2.528 | 3.515 | 2.499 | 0.710953058 | 0.988528481 | |
| 18 | 2 | 1.286 | 2.033 | 1.425 | 0.700934579 | 1.108087092 | |
| 19 | 2 | 4.878 | 6.809 | 5.067 | 0.744162138 | 1.038745387 | |
| 20 | 2 | 0.024 | 0.16 | 0.093 | 0.58125 | 3.875 | |
| 21 | 2 | 0.959 | 4.76 | 2.285 | 0.480042017 | 2.382690302 | |
| 22 | 2 | 1.236 | 5.304 | 0.884 | 0.166666667 | 0.715210356 | |
| 23 | 2 | 1.875 | 10.077 | 2.088 | 0.207204525 | 1.1136 | |
| 24 | 2 | 3.409 | 26.25 | 8.919 | 0.339771429 | 2.616309768 | |
| 25 | 2 | 0.24 | 1.092 | 0.403 | 0.369047619 | 1.679166667 | |
| 26 | 2 | 0.234 | 0.658 | 0.307 | 0.46656535 | 1.311965812 | |
| 27 | 2 | 0.244 | 1.034 | 0.427 | 0.412959381 | 1.75 | |
| 28 | 2 | 1.327 | 5.533 | 1.422 | 0.257003434 | 1.071590053 | |
| 29 | 2 | 4.396 | 7.815 | 4.845 | 0.619961612 | 1.102138308 | |
| 30 | 2 | 0.024 | 0.1 | 0.113 | 1.13 | 4.708333333 | |
| 31 | 2 | 0.707 | 1.246 | 0.743 | 0.596308186 | 1.050919378 | |
| 32 | 2 | 1.088 | 2 | 1.155 | 0.5775 | 1.061580882 | |
| 33 | 2 | 6.285 | 8.58 | 6.527 | 0.760722611 | 1.038504375 | |
| 34 | 2 | 5.146 | 9.314 | 5.326 | 0.571827357 | 1.034978624 | |
| 35 | 2 | 5.161 | 9.431 | 5.453 | 0.578199555 | 1.056578183 | |
| 36 | 2 | 0.694 | 0.873 | 0.686 | 0.785796105 | 0.988472622 | |
| 37 | 2 | 0.074 | 0.163 | 0.109 | 0.668711656 | 1.472972973 | |
| 38 | 2 | 0.049 | 0.114 | 0.082 | 0.719298246 | 1.673469388 | |
| 39 | 2 | 0.058 | 0.155 | 0.08 | 0.516129032 | 1.379310345 | |
| 40 | 2 | 0.126 | 0.28 | 0.145 | 0.517857143 | 1.150793651 | |
| 41 | 2 | 0.025 | 0.062 | 0.058 | 0.935483871 | 2.32 | |
| 42 | 2 | 0.023 | 0.053 | 0.059 | 1.113207547 | 2.565217391 | |
| 43 | 2 | 0.016 | 0.052 | 0.048 | 0.923076923 | 3 |
after fix some bugs, performance has a little improvement, parquet is faster than mergetree on low cardinality string column
total
time plot
relative ratio plot
@nickitat Hi, currently the functions of parquet reader have been mostly completed, supporting most common types, and the latest performance of my test data is also far ahead of arrow. There is still a lot of work to be done, including supporting page index and other details optimization, which is a long-term work. If I hope to merge it into master, I can have a more complete CI to ensure quality, and it will be more convenient to cooperate with other developers in the community. What else do I need to do? Because there is already a native reader, my goal is to replace the existing parquet reader, but considering that many people have worked on the existing reader (data page v2, bfloat16, etc.), I need help to judge and give a suitable merging plan.
supported type Primitive Type
- Boolean
- Int8
- Int16
- Int32
- Int64
- UInt8
- UInt16
- UInt32
- UInt64
- Float32
- Float64
- Date32
- Date
- DateTime
- DateTime64
- FixedString
- Decimal32
- Decimal64
- Deicmal128
- Decimal256
Nested Type
- ArrayType
- StructType
- TupleType
There are still some use cases for filter push down to be added, which is expected to be completed in one to two weeks.
@nickitat Hi, I have fixed most of the issues and can fully support the test cases of the existing native reader. I think it is a basically usable version. There will be more details to optimize it in different scenarios. At the same time, I have introduced new configurations such as enabling multiple parquet readers to coexist. I hope you can help review this PR when you have time.
@al13n321 Hi, are you interested in reviewing this PR? It seems no one is willing to review it.
@alexey-milovidov @rschu1ze Can you guys add a can be tested label on this PR? Thanks.
Workflow [PR], commit [d4d7f66b]
It breaks with indexHint:
% rm t.parquet; ~/ClickHouse/build/programs/clickhouse local --stacktrace -q "insert into function file('t.parquet') select 1 as x; select * from file('t.parquet') where indexHint(x = 1) settings
input_format_parquet_use_native_reader_with_filter_push_down=1"
rm: cannot remove 't.parquet': No such file or directory
Code: 59. DB::Exception: Illegal type Const(UInt8) of column for filter. Must be UInt8 or Nullable(UInt8) or Const variants of them.: (in file/uri /home/ubuntu/ClickHouse/t.parquet): While executing ParquetBlockInputFormat: While executing File. (ILLEGAL_TYPE_OF_COLUMN_FOR_FILT
ER), Stack trace (when copying this message, always include the lines below):
0. ./contrib/llvm-project/libcxx/include/__exception/exception.h:106: Poco::Exception::Exception(String const&, int) @ 0x00000000171baa52
1. ./build/./src/Common/Exception.cpp:108: DB::Exception::Exception(DB::Exception::MessageMasked&&, int, bool) @ 0x000000000d1beb79
2. DB::Exception::Exception(PreformattedMessage&&, int) @ 0x0000000007fbe3ec
3. DB::Exception::Exception<String>(int, FormatStringHelperImpl<std::type_identity<String>::type>, String&&) @ 0x0000000007fbe0eb
4. ./build/./src/Columns/FilterDescription.cpp:98: DB::FilterDescription::FilterDescription(DB::IColumn const&) @ 0x0000000012f73802
5. ./build/./src/Processors/Formats/Impl/Parquet/ColumnFilter.cpp:1107: DB::ExpressionFilter::execute(std::vector<DB::ColumnWithTypeAndName, std::allocator<DB::ColumnWithTypeAndName>> const&) @ 0x000000001420b9b3
6. ./build/./src/Processors/Formats/Impl/Parquet/RowGroupChunkReader.cpp:473: DB::SelectConditions::selectRows(unsigned long) @ 0x000000001427f6b1
7. ./build/./src/Processors/Formats/Impl/Parquet/RowGroupChunkReader.cpp:161: DB::RowGroupChunkReader::readChunk(unsigned long) @ 0x000000001427d4a0
8. ./build/./src/Processors/Formats/Impl/Parquet/ParquetReader.cpp:188: DB::SubRowGroupRangeReader::read(unsigned long) @ 0x000000001421d107
9. ./build/./src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp:1045: DB::ParquetBlockInputFormat::decodeOneChunk(unsigned long, std::unique_lock<std::mutex>&) @ 0x0000000013e6aef3
10. ./build/./src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp:949: void std::__function::__policy_invoker<void ()>::__call_impl[abi:ne190107]<std::__function::__default_alloc_func<DB::ParquetBlockInputFormat::scheduleRowGroup(unsigned long)::$_0, void ()>>(std::__functi
on::__policy_storage const*) @ 0x0000000013e6e505
11. ./contrib/llvm-project/libcxx/include/__functional/function.h:716: ? @ 0x000000000d2990ec
12. ./contrib/llvm-project/libcxx/include/__type_traits/invoke.h:117: ThreadFromGlobalPoolImpl<false, true>::ThreadFromGlobalPoolImpl<void (ThreadPoolImpl<ThreadFromGlobalPoolImpl<false, true>>::ThreadFromThreadPool::*)(), ThreadPoolImpl<ThreadFromGlobalPoolImpl<false, true>>::
ThreadFromThreadPool*>(void (ThreadPoolImpl<ThreadFromGlobalPoolImpl<false, true>>::ThreadFromThreadPool::*&&)(), ThreadPoolImpl<ThreadFromGlobalPoolImpl<false, true>>::ThreadFromThreadPool*&&)::'lambda'()::operator()() @ 0x000000000d29dba3
13. ./contrib/llvm-project/libcxx/include/__functional/function.h:716: ? @ 0x000000000d296fad
14. ./contrib/llvm-project/libcxx/include/__type_traits/invoke.h:117: void* std::__thread_proxy[abi:ne190107]<std::tuple<std::unique_ptr<std::__thread_struct, std::default_delete<std::__thread_struct>>, void (ThreadPoolImpl<std::thread>::ThreadFromThreadPool::*)(), ThreadPoolIm
pl<std::thread>::ThreadFromThreadPool*>>(void*) @ 0x000000000d29bf9b
15. ? @ 0x00007ffff7c94ac3
16. ? @ 0x00007ffff7d26850
Thank you for your review, I will solve the above problems one by one
In the future, @yxheartipp will work with me to improve this PR
This is probably outside the scope of this PR, but I think there's a better way to do PREWHERE-like prefiltering and input_format_parquet_bloom_filter_push_down. Especially when reading from network (rather than local file), especially with high latency. It seems that the best read strategy would be to read in 4 stages, pipelined as 4 sliding windows of tasks running in parallel (with some dependencies between them):
- Filter prefetch: read file byte ranges corresponding to bloom filters and/or the smallest/most-promising filter columns (similar to PREWHERE). Runs on IO threads with very high parallelism (important when reading from network), for lots of row groups at once, because these filters/columns are small and don't take a lot of memory per row group.
- Decompress and decode those bloom filters and filter columns. Evaluate filter expressions and produce filter masks. On non-IO threads, with parallelism matching the number of cores. One task per row group is ok, no need to parallelize by individual columns. The resulting filter masks are small (especially if we bit-pack them), so we can afford these tasks to run far ahead of the main data reading, usually all the way to the end of the file (for good read size estimate for the progress bar).
- Main data prefetch: read file byte ranges corresponding to the remaining columns. Use filter masks from previous step and page offset index to determine byte ranges to read. That's why this is a separate step from filtering - we want to know tight read ranges before we start reading. Runs on IO threads, with medium parallelism. Can't have very high parallelism because this takes a lot of memory. But higher than decompression+decoding because data is still compressed at this stage. Can read byte ranges within the same row group in parallel, can merge multiple small row groups into one byte range.
- Decompress and decode the remaining columns. Runs on non-IO threads, with parallelism matching the number of cores, or limited by memory. Parallelized at the level of primitive columns, not row groups (this is a problem for the current reader that can use at most one thread per row group, but we often don't have enough memory for many row groups) (this is another reason to build filter masks separately - it allows reading columns independently from each other). Composite columns like tuples can be reassembled at the very end before returning the block from ParquetBlockInputFormat.
Each of these would be pipelined across row groups, with a low and high watermark. The dependencies are simple: next stage for a row group can begin when the previous stage completed for that row group (but we won't necessarily begin one row group at a time; for small row groups we'll wait for a few row groups to be ready and schedule them as one task, based a target byte size).
(As a slight generalization, there can be multiple filtering stages: first by the smallest and most promising filter columns, then by other filter columns, then read main data. This hopefully won't be needed.)
(Another consideration is that sometimes when reading filter byte ranges we'll also read some data along the way, if gaps between ranges are small enough that we decided to read instead of seeking. Would be nice to preserve and reuse that read data in later stages instead of reading again. But other times we would rather deallocate it to free up more memory for later stages. Not sure how exactly to handle that.)
Maybe I'll make a native reader v3 at some point :) . This is speculative though, please finish this PR too.
(I previously assumed that this PR would have close to optimal performance and would need at most need incremental optimizations/simplifications. But now that I wrote down the above idea, it sounds very different from this PR, and potentially significantly faster and simpler (though it's likely that I missed something and it won't work). Probably faster to prototype it from scratch than modifying reader v2.)
This is probably outside the scope of this PR, but I think there's a better way to do PREWHERE-like prefiltering and
input_format_parquet_bloom_filter_push_down. Especially when reading from network (rather than local file), especially with high latency. It seems that the best read strategy would be to read in 4 stages, pipelined as 4 sliding windows of tasks running in parallel (with some dependencies between them):
- Filter prefetch: read file byte ranges corresponding to bloom filters and/or the smallest/most-promising filter columns (similar to PREWHERE). Runs on IO threads with very high parallelism (important when reading from network), for lots of row groups at once, because these filters/columns are small and don't take a lot of memory per row group.
- Decompress and decode those bloom filters and filter columns. Evaluate filter expressions and produce filter masks. On non-IO threads, with parallelism matching the number of cores. One task per row group is ok, no need to parallelize by individual columns. The resulting filter masks are small (especially if we bit-pack them), so we can afford these tasks to run far ahead of the main data reading, usually all the way to the end of the file (for good read size estimate for the progress bar).
- Main data prefetch: read file byte ranges corresponding to the remaining columns. Use filter masks from previous step and page offset index to determine byte ranges to read. That's why this is a separate step from filtering - we want to know tight read ranges before we start reading. Runs on IO threads, with medium parallelism. Can't have very high parallelism because this takes a lot of memory. But higher than decompression+decoding because data is still compressed at this stage. Can read byte ranges within the same row group in parallel, can merge multiple small row groups into one byte range.
- Decompress and decode the remaining columns. Runs on non-IO threads, with parallelism matching the number of cores, or limited by memory. Parallelized at the level of primitive columns, not row groups (this is a problem for the current reader that can use at most one thread per row group, but we often don't have enough memory for many row groups) (this is another reason to build filter masks separately - it allows reading columns independently from each other). Composite columns like tuples can be reassembled at the very end before returning the block from ParquetBlockInputFormat.
Each of these would be pipelined across row groups, with a low and high watermark. The dependencies are simple: next stage for a row group can begin when the previous stage completed for that row group (but we won't necessarily begin one row group at a time; for small row groups we'll wait for a few row groups to be ready and schedule them as one task, based a target byte size).
(As a slight generalization, there can be multiple filtering stages: first by the smallest and most promising filter columns, then by other filter columns, then read main data. This hopefully won't be needed.)
(Another consideration is that sometimes when reading filter byte ranges we'll also read some data along the way, if gaps between ranges are small enough that we decided to read instead of seeking. Would be nice to preserve and reuse that read data in later stages instead of reading again. But other times we would rather deallocate it to free up more memory for later stages. Not sure how exactly to handle that.)
Maybe I'll make a native reader v3 at some point :) . This is speculative though, please finish this PR too.
(I previously assumed that this PR would have close to optimal performance and would need at most need incremental optimizations/simplifications. But now that I wrote down the above idea, it sounds very different from this PR, and potentially significantly faster and simpler (though it's likely that I missed something and it won't work). Probably faster to prototype it from scratch than modifying reader v2.)
This is a very professional suggestion. I originally made this reader hoping to have a more efficient single-threaded reader. I did not consider the parallel issues too much and only did some parallel processing on IO.
Dear @al13n321, this PR hasn't been updated for a while. You will be unassigned. Will you continue working on it? If so, please feel free to reassign yourself.
I'm working on another parquet reader: https://github.com/ClickHouse/ClickHouse/pull/78380 (only the last 3 commits in that PR are relevant). Current plan is to merge both this PR ("v2") and that PR ("v3"), then see if one is pareto-better than the other and maybe remove one (and also remove v1).
The number of parquet readers in the codebase is getting kind of ridiculous, we should do something to separate them more cleanly and make them easy to remove. I'm thinking:
- Move the writer, native readers v1, v2 (yours), and v3 (mine) to 4 subdirectories of Formats/Impl/Parquet. Please move the v2 into a directory in this PR, then I'll send a separate PR to move the existing files (writer and v1).
- Don't add parquet-specific things to KeyCondition. ~Looks like the KeyCondition always comes from a call to
SourceWithKeyCondition::setKeyCondition(const std::optional<ActionsDAG>, ContextPtr). So maybe we can just remove the overloadsetKeyCondition(const std::shared_ptr<const KeyCondition>)and makeSourceWithKeyConditionstore anActionsDAGinstead ofKeyCondition? The format can create KeyCondition itself. Please try this. (Ideally we would also do something to reuse the KeyCondition/ColumnFilterHelper when reading many files in one query; I'm working on it, will try to unify it with https://github.com/ClickHouse/ClickHouse/pull/66253/ as well.)~ Use https://github.com/ClickHouse/ClickHouse/pull/80931 instead.
(I was also hoping that some v3 things would be easy to reuse in v2, but that doesn't seem to be the case. V3 integrates with prewhere, and will probably use KeyCondition for all non-prewhere filtering instead of re-analyzing the ActionsDAG; but that doesn't seem to fit v2 because v2 wants to split all conditions by columns, so it would need to pointlessly reimplement the splitting for KeyCondition and prewhere separately. V3 has some complicated fine-grained task scheduling and prefetching to extract ~all available parallelism and to avoid prefetching data before filters are applied; it doesn't seem feasible to reuse at all.)
Please make these changes, then we can get this merged and start benchmarking (and hopefully v3 will be ready soon so we can benchmark everything at once). You don't have to address all the nitpicks from review comments, up to you.
Don't add parquet-specific things to KeyCondition.
Use https://github.com/ClickHouse/ClickHouse/pull/80931 to get access to ActionsDAG from ParquetBlockInputFormat.
@liuneng1994, thank you so much for the contribution, which provided the basis of the currently merged implementation!