parquet-java icon indicating copy to clipboard operation
parquet-java copied to clipboard

PARQUET-2171: Support Hadoop vectored IO

Open steveloughran opened this issue 1 year ago • 29 comments

Make sure you have checked all steps below.

Jira

  • [X] My PR addresses the following Parquet Jira issues and references them in the PR title. For example, "PARQUET-1234: My Parquet PR"
    • https://issues.apache.org/jira/browse/PARQUET-XXX
    • In case you are adding a dependency, check if the license complies with the ASF 3rd Party License Policy.

Tests

  • [X] My PR adds the following unit tests TestVectorIOBridge TestFileRangeBridge

It also parameterizes existing tests run with/without vector IO enabled. TestParquetFileWriter TestInputFormatColumnProjection TestInputOutputFormat

Commits

  • [X] My commits all reference Jira issues in their subject lines. In addition, my commits follow the guidelines from "How to write a good git commit message":
    1. Subject is separated from body by a blank line
    2. Subject is limited to 50 characters (not including Jira issue reference)
    3. Subject does not end with a period
    4. Subject uses the imperative mood ("add", not "adding")
    5. Body wraps at 72 characters
    6. Body explains "what" and "why", not "how"

Documentation

  • [X] In case of new functionality, my PR adds documentation that describes how to use it.
    • All the public functions and the classes in the PR contain Javadoc that explain what it does

steveloughran avatar Sep 13 '23 10:09 steveloughran

@steveloughran This looks really great! I think my only comment would be about wether we can expose the implementation in a way that might be more pluggable. In Iceberg we have a similar parallel to the InputFile and SeekableStream, but it's not apparent to me that we would be able to adapt our IO implementation to leverage vectored reads.

Open to thoughts on how we might do that as well.

Maybe this would be possible by implementing the vectoredIO methods in the Iceberg adaptor class here?

danielcweeks avatar Sep 15 '23 00:09 danielcweeks

@steveloughran Thanks a lot for creating this PR! This is an important feature that we improve the reading performance of Parquet. I just took a brief look and they look great! I will spend some time later to review it.

shangxinli avatar Sep 17 '23 17:09 shangxinli

@shangxinli looking forward to your comments -anything you can do to test will be wonderful too!

steveloughran avatar Sep 18 '23 17:09 steveloughran

@danielcweeks that's a good point about pluggability.

  1. an interface/implementation split in parquet would line you up later to choose the back end, maybe?
  2. I've done an initial pass at an shim library to use vectored IO operations if a stream/hadoop version had it, but fall back to usual blocking reads if not (along with the same for everything else). but just getting the base vector io stuff into parquet is a lot simpler. I don't know if that would be useful for iceberg https://github.com/apache/hadoop-api-shim
  3. video on the whole topic

getting iceberg to pass down which stripes it wants to read is critical for this to work best with s3, abfs and gcs. how are you reading the files at present?

steveloughran avatar Sep 18 '23 17:09 steveloughran

@gszadovszky thanks for your comments, will update the PR

steveloughran avatar Sep 19 '23 11:09 steveloughran

@danielcweeks that's a good point about pluggability. I don't know if that would be useful for iceberg https://github.com/apache/hadoop-api-shim

Iceberg can use the base Parquet File reader out of the box so should be able to use vector IO as it is.

getting iceberg to pass down which stripes it wants to read is critical for this to work best with s3, abfs and gcs. how are you reading the files at present?

However if the S3FileIO feature is enabled, Iceberg provides its own InputStream and InputFile implementation that use AWS SDK V2. Maybe an option to provide your own input stream to vector io might work.

parthchandra avatar Sep 28 '23 17:09 parthchandra

@mukund-thakur @steveloughran this is a great PR! Some numbers from an independent benchmark. I used Spark to parallelize the reading of all rowgroups (just the reading of the raw data) from TPC-DS/SF10000/store_sales using various APIS and here are some numbers for you.

32 executors, 16 cores fs.s3a.threads.max = 20

Reader Avg Time (minutes) Median vs Baseline
Parquet 10.32 10 1
Parquet Vector IO 2.02 2 5.1
AWS SDK V2 9.86 10 1
AWS SDK V2 Async 9.66 9.6 1.1
AWS SDK V2 AsyncCrt 9.76 10 1.1
AWS SDK V2 S3TransferManager 9.58 9.5 1.1
AWS SDK V2 Async CRT Http Client 10.8 11 1

Summary - The various V2 SDK clients provide lower latency and better upload speeds but for raw data scans, they are all pretty much the same. Increasing the parallelism as, vector IO does, has maximum benefit.

parthchandra avatar Sep 28 '23 18:09 parthchandra

@parthchandra Thanks for running the benchmarks. the numbers are impressive.

mukund-thakur avatar Sep 29 '23 22:09 mukund-thakur

@parthchandra just wanted to check, are these numbers with Iceberg and S3FileIO? With S3A now using SDKV2, I'm looking at running a similar benchmark too with async CRT clients, but have been seeings some issues around connection pool exhaustion.

ahmarsuhail avatar Oct 17 '23 15:10 ahmarsuhail

@ahmarsuhail No these numbers are not with iceberg and S3FileIO. I used a modified (lots of stuff removed) version of the ParquetFileReader and a custom benchmark program that reads all the row groups in parallel and records the time spent in each read from S3. The modified version of ParquetFileReader can switch between the various methods of reading from S3. The entry AWS SDK V2 is a near copy of the Iceberg S3FileIO code though. I saw issues with the CRT client when running at scale causing JVM crashes. And the V2 transfer manager did not do range reads properly. Do share your experience.

parthchandra avatar Oct 17 '23 16:10 parthchandra

Thanks for the numbers; I am deep in the aws v2 sdk migration right now and haven't had a chance to work on this.

steveloughran avatar Oct 17 '23 18:10 steveloughran

...back on this. @parthchandra you know that hadoop trunk is built on the v2 sdk now?

steveloughran avatar Nov 17 '23 19:11 steveloughran

OK, I've tried to address the changes as well as merge with master

The one thing I'm yet to do is the one by @danielcweeks : have an interface for which the hadoop vector IO would be just one implementation.

We effectively have that in SeekableInputStream; two new default methods: one a probe for the api availability and the other an invocation.


Would you be able to wire up the iceberg reader to that? And if not, what changes are needed?

One thing we would need to make sure was good is the awaitFuture stuff; that's a copy of what's in hadoop to handle async IO operations. There's also a hard coded timeout of 300s to wait for the results; I don't know/recall where that number came from but it's potentially dubious as it won't recover from network problems.

steveloughran avatar Nov 17 '23 20:11 steveloughran

Code wise, no, other than reviews from others about what is the best place for things, such as that awaitFuture stuff or any other suggestions which people who know the parquet codebase think is best. Code works and we have been testing this through Amazon S3 Express storage for extra speed up. To be ruthless: there's no point paying the premium for that until you've embraced the extra speed ups you get from this first

steveloughran avatar Nov 29 '23 12:11 steveloughran

@steveloughran Can you fix the compatibility issue?

Error:  Failed to execute goal com.github.siom79.japicmp:japicmp-maven-plugin:0.18.2:cmp (default) on project parquet-hadoop: There is at least one incompatibility: 
org.apache.parquet.hadoop.util.vectorio.BindingUtils.raiseInnerCause(java.util.concurrent.ExecutionException):CLASS_GENERIC_TEMPLATE_CHANGED -> [Help 1]

Fokko avatar Nov 29 '23 20:11 Fokko

@Fokko that's japicmp getting its logic wrong because it's a new file; thought I'd edited the build rules so it would ignore that.

anyway, need to fix the merge as something (#1209?) has just broken it

steveloughran avatar Nov 30 '23 13:11 steveloughran

Thanks. I'm away from my laptop until 2024 but really do want to get this in.

steveloughran avatar Dec 18 '23 09:12 steveloughran

...rebased; now got some (brief) time to look at the comments -don't review just yet

steveloughran avatar Jan 15 '24 16:01 steveloughran

@steveloughran Thanks for this PR! When can we expect it to be merge-ready?

ggershinsky avatar Jan 30 '24 06:01 ggershinsky

let me have another look at your comments and make sure I've answered them all

steveloughran avatar Feb 02 '24 14:02 steveloughran

Update: I've been looking at overlapping ranges.

  • The base implementation doesn't care.
  • the specification and contract tests don't say they MUST be separate
  • I'm now worried that a bit of the s3a range coalescing code is getting overlaps wrong.

Here's what I propose

  • for hadoop api we say "no overlap" and explicitly raise IOE subclass in the invocation if so. (i.e. always fail fast)
  • add tests
  • for parquet, we do the same validation so behaviour is consistent across all impls
  • If there is an overlap, what to do there? fail or downgrade?

The reason i'd like to fail fast in overlaps is because it lines up nicely with ranged HTTP Get requests. S3 doesn't support that, but others might already and if not, could be persuaded. Currently the s3a client will coalesce "close enough" ranges into single GETs, discarding the data in between. This is more efficient for http connection use and latency -even costs less for in-EC2 queries.

I do want to add azure abfs vector reads this year.

steveloughran avatar Feb 12 '24 11:02 steveloughran

  • for parquet, we do the same validation so behaviour is consistent across all impls

I think that is the correct behaviour.

  • If there is an overlap, what to do there? fail or downgrade?

IMHO, it seems to me that the Parquet side should validate that there are no overlapping ranges. So if the implementation detects an overlapping range it would really be an internal Parquet error and fail fast would be preferable.

parthchandra avatar Feb 13 '24 23:02 parthchandra

Per the thread, it'd be good to have this patch in 1.14.0 :) Otherwise, can take a very long time till the next one..

ggershinsky avatar Feb 21 '24 14:02 ggershinsky

Per the thread, it'd be good to have this patch in 1.14.0 :) Otherwise, can take a very long time till the next one..

Note that this PR is for vectorized IO while the thread referred to is talking about vectorized reading (i.e. decoding). The two are not mutually exclusive and both provide performance gains independent of each other.

parthchandra avatar Feb 21 '24 17:02 parthchandra

this PR is already covered there. But I mostly meant the thread title.

ggershinsky avatar Feb 21 '24 18:02 ggershinsky

quick updatge on this: in https://issues.apache.org/jira/browse/HADOOP-19098 I'm going to declare that overlapping ranges triggers IllegalArgumentException everywhere; to date only s3a threw exceptions.

For the parquet PR I will

  • add the same safety checks so that it is consistent with older releases
  • with tests

Now, what do people think about fallbacks? we've been using this and to date parquet hasn't ever issued an overlapping request, but there's still the future to think about. How about, when hadoop vector IO requests fail during parquet-side range checks, there's something logged (level?) and fallback to classic PositionedReadable. That way, jobs will always succeed the way they used to.

steveloughran avatar Mar 04 '24 12:03 steveloughran

Now, what do people think about fallbacks? we've been using this and to date parquet hasn't ever issued an overlapping request, but there's still the future to think about. How about, when hadoop vector IO requests fail during parquet-side range checks, there's something logged (level?) and fallback to classic PositionedReadable.

I think this is reasonable (INFO level, I think. This is similar to INFO S3AInputStream: Switching to Random IO seek policy . Though I'm also fine if an exception is thrown if there are overlapping requests.

parthchandra avatar Mar 04 '24 17:03 parthchandra

@steveloughran @mukund-thakur do you guys have any information on how much (if any) this impacts the peak memory utilization in the parquet file reader? The total memory allocated while reading a row group should remain the same, but if vector io allocates intermediate buffers (or even if the http request to S3, say, allocates additional memory) then we could see an increase in peak memory usage. Have you folks encountered any issues with that?

parthchandra avatar Mar 06 '24 22:03 parthchandra

@steveloughran @mukund-thakur do you guys have any information on how much (if any) this impacts the peak memory utilization in the parquet file reader? The total memory allocated while reading a row group should remain the same, but if vector io allocates intermediate buffers (or even if the http request to S3, say, allocates additional memory) then we could see an increase in peak memory usage. Have you folks encountered any issues with that?

Hey, we haven't done any memory profiling or seen any memory-related issues during our internal benchmarking.

mukund-thakur avatar Mar 07 '24 22:03 mukund-thakur

@steveloughran are you planning to incorporate the read metrics added in https://github.com/apache/parquet-mr/commit/2e0cd1925546d2560f7658086251851e6fa68559 ? I can add them after this is merged so as not to hold up this PR.

parthchandra avatar Mar 23 '24 17:03 parthchandra