parquet-java icon indicating copy to clipboard operation
parquet-java copied to clipboard

PARQUET-2149: Async IO implementation for ParquetFileReader

Open parthchandra opened this issue 2 years ago • 62 comments

Jira

This PR addresses the following PARQUET-2149: Implement async IO for Parquet file reader

Tests

This PR adds the following unit tests AsyncMultiBufferInputStream.* TestMultipleWriteRead.testReadWriteAsync TestColumnChunkPageWriteStore.testAsync

The PR is also tested by changing the default configuration to make all reads async and then ensuring all unit tests pass

parthchandra avatar May 16 '22 21:05 parthchandra

Anyone know why the CI checks are failing with a SocketTimeout exception, and what to do to address this?

parthchandra avatar May 17 '22 17:05 parthchandra

@parthchandra do you have performance benchmark? Thanks

dbtsai avatar May 17 '22 17:05 dbtsai

I have some numbers from an internal benchmark using Spark. I didn't see any benchmarks in the Parquet codebase that I could reuse.

Here are the numbers from my own benchmark -

  • 10 runs, each run reads all columns from store_sales (the largest table) in the TPC-DS (100G) dataset spark.sql("select * from store_sales")
  • Sync reader with default 8MB buffer size, Async reader with 1MB buffer size (achieves better pipelining)
  • Run on Macbook Pro, reading from S3. Spark has 6 cores.
  • All times in seconds
Run Async Sync Async (w/o outliers) Sync (w/o outliers)
1 84 102 - -
2 90 366 90 366
3 78 156 - 156
4 84 128 84 -
5 108 402 - -
6 90 432 90 -
7 84 378 84 378
8 108 324 - 324
9 90 318 90 318
10 90 282 90 282
Average 90.6 288.8 88 304
Median 90 321 90 321
StdDev 9.98 119.

After removing the two highest and two lowest runs for each case, and taking the median value:

Async: 90 sec Sync: 321 sec

parthchandra avatar May 17 '22 22:05 parthchandra

Great effort! WIll have a look after the build succeed.

shangxinli avatar May 18 '22 15:05 shangxinli

@parthchandra Would you mind having a look at my I/O performance optimization plan for ParquetMR? I think we should coordinate, since we have some ideas that might overlap what we touch. https://docs.google.com/document/d/1fBGpF_LgtfaeHnPD5CFEIpA2Ga_lTITmFdFIcO9Af-g/edit?usp=sharing

theosib-amazon avatar May 18 '22 15:05 theosib-amazon

cc @rdblue @gszadovszky @ggershinsky

dbtsai avatar May 18 '22 16:05 dbtsai

Great effort! WIll have a look after the build succeed. @shangxinli I have no idea how to get the failed CI to pass. These failures appear to be in unrelated areas caused by some infra issues. Is there a way to trigger a rerun?

parthchandra avatar May 18 '22 16:05 parthchandra

@parthchandra Would you mind having a look at my I/O performance optimization plan for ParquetMR? I think we should coordinate, since we have some ideas that might overlap what we touch. https://docs.google.com/document/d/1fBGpF_LgtfaeHnPD5CFEIpA2Ga_lTITmFdFIcO9Af-g/edit?usp=sharing

@theosib-amazon I read your document and went thru #960. It looks like for the most part, #960 and this PR and complement each other. The overlap I see is in the changes to MultiBufferInputStream where you have added the readFully, and skipFully APIs. The bulk of my changes for async IO are in a class derived from MultiBufferInputStream and the heart of the changes depends on overriding MultiBufferInputStream.nextBuffer. In MultiBufferInputStream.nextBuffer the assumption is that all the buffers have been read into. In AsyncMultiBufferInputStream.nextBuffer this assumption is removed and the call blocks only if the next required buffer has not been read into. Now, skipFully and readFully are potentially blocking calls because both call nextBuffer repeatedly if necessary. To gain maximum pipelining, you want to make calls to skipFully and readFully such that you never block for too long (or at all) in the call. You will get this if you are skipping or reading less than the number of bytes in a single buffer. This is generally the case as decompression and decoding is at the page level and that is smaller than the size of a single buffer. However, for your optimizations, you should be aware of this behaviour. From what I see, I don't think there will be a conflict. I'll pull in your PR and give it a deeper look.

parthchandra avatar May 18 '22 17:05 parthchandra

@parthchandra One thing that confuses me a bit is that these buffers have only ByteBuffer inside them. There's no actual I/O, so it's not possible to block. Do you have subclasses that provide some sort of access to real I/O?

theosib-amazon avatar May 18 '22 17:05 theosib-amazon

@parthchandra One thing that confuses me a bit is that these buffers have only ByteBuffer inside them. There's no actual I/O, so it's not possible to block. Do you have subclasses that provide some sort of access to real I/O?

Good point. MultiBufferInputStream is constructed using buffers that have been filled already. AsyncMultiBufferInputStream takes an input stream as a parameter in the constructor and performs the IO itself. In ByteBufferInputStream I added

  public static ByteBufferInputStream wrapAsync(ExecutorService threadPool, SeekableInputStream fileInputStream,
    List<ByteBuffer> buffers) {
    return new AsyncMultiBufferInputStream(threadPool, fileInputStream, buffers);
  }

parthchandra avatar May 18 '22 18:05 parthchandra

@theosib-amazon I applied my PR on top of your PR, ran thru some tests using Spark, and hit no issues. (All unit tests passed as well).

parthchandra avatar May 18 '22 23:05 parthchandra

  1. whose s3 client was used for testing here -if the s3a one, which hadoop release?
  2. the azure abfs and gcs connectors do async prefetching of the next block, but are simply assuming that code will read sequentially; if there is another seek/readFully to a new location, those prefetches will be abandoned. there is work in s3a to do prefetching here with caching, so as to reduce the penalty of backwards seeks. https://issues.apache.org/jira/browse/HADOOP-18028

hadoop is adding a vectored IO api intended for libraries like orc and parquet to be able to use, where the application provides an unordered list of ranges, a bytebuffer supplier and gets back a list of futures to wait for. the base implementation simply reads using readFully APi. s3a (and later abfs) will do full async retrieval itself, using the http connection pool. https://issues.apache.org/jira/browse/HADOOP-18103

both vectored io and s3a prefetching will ship this summer in hadoop 3.4.0. i don't see this change conflicting with this, though they may obsolete a lot of it.

have you benchmarked this change with abfs or google gcs connectors to see what difference it makes there?

steveloughran avatar May 23 '22 15:05 steveloughran

@steveloughran thank you very much for taking the time to review and provide feedback!

  1. whose s3 client was used for testing here -if the s3a one, which hadoop release?

I was working with s3a - Spark 3.2.1 Hadoop (Hadoop-aws) 3.3.2 AWS SDK 1.11.655

  1. the azure abfs and gcs connectors do async prefetching of the next block, but are simply assuming that code will read sequentially; if there is another seek/readFully to a new location, those prefetches will be abandoned. there is work in s3a to do prefetching here with caching, so as to reduce the penalty of backwards seeks. https://issues.apache.org/jira/browse/HADOOP-18028

I haven't worked with abfs or gcs. If the connectors do async pre-fetching, that would be great. Essentially, the time the Parquet reader would have to block in the file system API would reduce substantially. In such a case, we could turn the async reader on/off and rerun the benchmark to compare. From past experience with the MaprFS which had very aggressive read ahead in its hdfs client, I would still expect better parquet speeds. The fact that the prefetch is turned off when a seek occurs is usual behaviour, but we may see no benefit from the connector in that case. So a combination of async reader and async connector might end up being a great solution (maybe at a slightly greater CPU utilization). We would still have to do a benchmark to see the real effect. The async version in this PR takes care of the sequential read requirement by a) opening a new stream for each column and ensuring every column is read sequentially. Footers are read using a separate stream. Except for the footer, no other stream ever seeks to a new location. b) The amount of data to be read is predetermined so there is never a read ahead that is discarded.

hadoop is adding a vectored IO api intended for libraries like orc and parquet to be able to use, where the application provides an unordered list of ranges, a bytebuffer supplier and gets back a list of futures to wait for. the base implementation simply reads using readFully APi. s3a (and later abfs) will do full async retrieval itself, using the http connection pool. https://issues.apache.org/jira/browse/HADOOP-18103

both vectored io and s3a prefetching will ship this summer in hadoop 3.4.0. i don't see this change conflicting with this, though they may obsolete a lot of it.

Yes, I became aware of this recently. I'm discussing integration of these efforts in a separate channel. At the moment I see no conflict, but have yet to determine how much of this async work would need to be changed. I suspect we may be able to eliminate or vastly simplify AsyncMultiBufferInputStream.

have you benchmarked this change with abfs or google gcs connectors to see what difference it makes there?

No I have not. Would love help from anyone in the community with access to these. I only have access to S3.

parthchandra avatar May 24 '22 03:05 parthchandra

I was working with s3a Spark 3.2.1 Hadoop (Hadoop-aws) 3.3.2 AWS SDK 1.11.655

thanks., that means you are current with all shipping improvments. the main one extra is to use openFile(), passing in length and requesting randomio. this guarantees ranged GET requests and cuts the initial HEAD probe for existence/size of file.

have you benchmarked this change with abfs or google gcs connectors to see what difference it makes there?

No I have not. Would love help from anyone in the community with access to these. I only have access to S3.

that I have. FWIW, with the right tuning of abfs prefetch (4 threads, 128 MB blocks) i can get full FTTH link rate from a remote store; 700 mbit/s . that's to the base station. once you add wifi the bottlenecks move.

steveloughran avatar May 24 '22 08:05 steveloughran

Is byte (and arrays and buffers of bytes) the only datatype you support? My PR is optimizing code paths that pull ints, longs, and other sizes out of the data buffers. Are those not necessary for any of the situations where you're using an async buffer?

theosib-amazon avatar May 24 '22 16:05 theosib-amazon

thanks., that means you are current with all shipping improvments. the main one extra is to use openFile(), passing in length and requesting randomio. this guarantees ranged GET requests and cuts the initial HEAD probe for existence/size of file.

By openFile() do you mean FileSystem.openFileWithOptions(Path,OpenFileParameters)? While looking I realized the Parquet builds with a much older version of hadoop

have you benchmarked this change with abfs or google gcs connectors to see what difference it makes there?

No I have not. Would love help from anyone in the community with access to these. I only have access to S3.

that I have. FWIW, with the right tuning of abfs prefetch (4 threads, 128 MB blocks) i can get full FTTH link rate from a remote store; 700 mbit/s . that's to the base station. once you add wifi the bottlenecks move.

Wow! That is nearly as fast as local HDD. At this point the bottlenecks in parquet begin to move towards decompression and decoding but IO remains the slowest link in the chain. One thing we get with my PR is that the ParquetFileReader had assumptions built in that all data must be read before downstream can proceed. Some of my changes are related to removing these assumptions and ensuring that downstream processing does not block until an entire column is read so we get efficient pipelining. What does the 128 MB block mean? Is this the amount prefetched for a stream? The read API does not block until the entire block is filled, I presume. With my PR, parquet IO is reading 8MB at a time (default) and downstream is processing 1MB at a time (default) and several such streams (one per column) are in progress at the same time. Hopefully, this read pattern would work with the prefetch.

parthchandra avatar May 24 '22 21:05 parthchandra

Is byte (and arrays and buffers of bytes) the only datatype you support? My PR is optimizing code paths that pull ints, longs, and other sizes out of the data buffers. Are those not necessary for any of the situations where you're using an async buffer? The input stream API is generally unaware of the datatypes of its contents and so those are the only apis I use. The other reason is that the ParquetFileReader returns Pages which basically contain metadata and ByteBuffers of compressed data. The decompression and decoding into types comes much later in a downstream thread. For your PR, I don't think the AsyncMultibufferInputStream is every going to be in play in the paths you're optimizing. But just in case it is, your type aware methods will work as is because AsyncMultibufferInputStream is derived from MultiBufferInputStream and will inherit those methods.

parthchandra avatar May 24 '22 21:05 parthchandra

At this point the bottlenecks in parquet begin to move towards decompression and decoding but IO remains the slowest link in the chain.

Latency is the killer; in an HTTP request you want read enough but not discard data or break an http connection if the client suddenly does a seek() or readFully() somewhere else. file listings, existence checks etc.

One thing we get with my PR is that the ParquetFileReader had assumptions built in that all data must be read before downstream can proceed. Some of my changes are related to removing these assumptions and ensuring that downstream processing does not block until an entire column is read so we get efficient pipelining.

That'd be great. now, if you could also handle requesting different columns in parallel and processing them out of order.

What does the 128 MB block mean? Is this the amount prefetched for a stream? The read API does not block until the entire block is filled, I presume.

this was the abfs client set to do four GET requests of 128MB each. this would be awful for columns stores where smaller ranges are often requested/processed before another seek is made, but quite often parquet does do more back to back reads than just one read/readFully request

With my PR, parquet IO is reading 8MB at a time (default) and downstream is processing 1MB at a time (default) and several such streams (one per column) are in progress at the same time. Hopefully, this read pattern would work with the prefetch.

be good to think about vectored IO.

and yes, updating parquet dependencies would be good, hadoop 3.3.0 should be the baseline.

just sketched out my thoughts on this. I've played with some of this in my own branch. I think the next step would be for me to look at the benchmark code to make it targetable elsewhere.

https://docs.google.com/document/d/1y9oOSYbI6fFt547zcQJ0BD8VgvJWdyHBveaiCHzk79k/

steveloughran avatar May 24 '22 21:05 steveloughran

This is interesting, because when I did profiling of Trino, I found that although I/O (from S3, over the network no less) was significant, even more time was spent in compute. Maybe you're getting improved performance because you're increasing parallelism between I/O and compute.

theosib-amazon avatar May 24 '22 23:05 theosib-amazon

Latency is the killer; in an HTTP request you want read enough but not discard data or break an http connection if the client suddenly does a seek() or readFully() somewhere else. file listings, existence checks etc.

That'd be great. now, if you could also handle requesting different columns in parallel and processing them out of order.

I do. The Parquet file reader api that reads row groups in sync mode reads all columns in sequence. In async mode, it fires off a task for every column blocking only to read the first page of every column before returning. This part also uses a different thread pool from the IO tasks so that IO tasks never wait because there are no available threads in the thread pool.

be good to think about vectored IO.

I think I know how to integrate this PR with the vectored IO, but this is only after a cursory look.

and yes, updating parquet dependencies would be good, hadoop 3.3.0 should be the baseline.

Who can drive this (presumably) non-trivial change? I myself have no karma points :(

just sketched out my thoughts on this. I've played with some of this in my own branch. I think the next step would be for me to look at the benchmark code to make it targetable elsewhere.

https://docs.google.com/document/d/1y9oOSYbI6fFt547zcQJ0BD8VgvJWdyHBveaiCHzk79k/

This is great. I now have much more context of where you are coming from (and going to) !

parthchandra avatar May 24 '22 23:05 parthchandra

Is byte (and arrays and buffers of bytes) the only datatype you support? My PR is optimizing code paths that pull ints, longs, and other sizes out of the data buffers. Are those not necessary for any of the situations where you're using an async buffer? The input stream API is generally unaware of the datatypes of its contents and so those are the only apis I use. The other reason is that the ParquetFileReader returns Pages which basically contain metadata and ByteBuffers of compressed data. The decompression and decoding into types comes much later in a downstream thread. For your PR, I don't think the AsyncMultibufferInputStream is every going to be in play in the paths you're optimizing. But just in case it is, your type aware methods will work as is because AsyncMultibufferInputStream is derived from MultiBufferInputStream and will inherit those methods.

I'm still learning Parquet's structure. So it sounds to me like these buffer input streams are used twice. Once to get data and decompress it, and then once again to decode it into data structures. Is that correct? So it sounds like you're optimizing one layer of processing, and I'm optimizing the next layer up, and it's kindof a coincidence that we're touching some of the same classes just because code reuse has been possible here.

theosib-amazon avatar May 24 '22 23:05 theosib-amazon

BTW, adding more tests for the InputStream implementations.

parthchandra avatar May 24 '22 23:05 parthchandra

So it sounds like you're optimizing one layer of processing, and I'm optimizing the next layer up, and it's kindof a coincidence that we're touching some of the same classes just because code reuse has been possible here.

Yeah, kind of cool :)

parthchandra avatar May 24 '22 23:05 parthchandra

This is interesting, because when I did profiling of Trino, I found that although I/O (from S3, over the network no less) was significant, even more time was spent in compute. Maybe you're getting improved performance because you're increasing parallelism between I/O and compute.

It may be because I was using Spark's vectorized parquet decoding which is an order or magnitude faster than parquet library's row by row decoding (see Spark benchmarks). If trino is not doing vectorized decoding (I took a very quick look and I don't think it is), I would suggest you can look into that next. All the cool kids are doing it.

parthchandra avatar May 25 '22 00:05 parthchandra

and yes, updating parquet dependencies would be good, hadoop 3.3.0 should be the baseline.

+1 on upgrading to 3.3.0, although currently parquet is using 2.10.1 as a provided dependency and we need to make sure it continues to work with hadoop 2.x

It may be because I was using Spark's vectorized parquet decoding which is an order or magnitude faster than parquet library's row by row decoding (see Spark benchmarks). If trino is not doing vectorized decoding (I took a very quick look and I don't think it is), I would suggest you can look into that next. All the cool kids are doing it.

Presto already has a batch reader but seems the feature is not in Trino yet. The batch reader did help a lot to reduce the CPU load. See the slides.

sunchao avatar May 25 '22 17:05 sunchao

That batch reader in Presto reminds me of some of the experimental changes I made in Trino. I modified PrimitiveColumnReader to work out how many of each data item it needs to read from the data source and requests all of them at once in an array. This doubled the performance of some TPCDS queries. This is why I have array access methods planned for ParquetMR. (https://docs.google.com/document/d/1fBGpF_LgtfaeHnPD5CFEIpA2Ga_lTITmFdFIcO9Af-g/edit?usp=sharing) Requesting data in bulk saves a lot on function call overhead for each data item.

theosib-amazon avatar May 25 '22 17:05 theosib-amazon

I've started work on a fs-api-shim library, with the goal of "apps compile against hadoop 3.2.0 can get access to the 3.3 and 3.4 APIs when available either with transparent fallback (openFile()) or ability to probe the API before trying to invoke

https://github.com/steveloughran/fs-api-shim

openfile takes the seek & status params, falls back to open() : https://github.com/steveloughran/fs-api-shim/blob/main/fs-api-shim-library/src/main/java/org/apache/hadoop/fs/shim/FileSystemShim.java#L87

ByteBufferReadable will raise UnsupportedException if not found, there is a check for it https://github.com/steveloughran/fs-api-shim/blob/main/fs-api-shim-library/src/main/java/org/apache/hadoop/fs/shim/FSDataInputStreamShim.java

Vector IO SHALL be available the same way

Adopt 3.2.0 then and we will help give the library the ability to use the newer api calls, even stuff not yet shipped in apache releases.

(I want to release this as an asf artifact with oversight by hadoop project. lets us maintain it)

steveloughran avatar Jun 09 '22 17:06 steveloughran

Sounds good.

Also, perhaps check if the ByteBufferReadable interface is implemented in the stream?

ByteBufferReadable will raise UnsupportedException if not found, there is a check for it https://github.com/steveloughran/fs-api-shim/blob/main/fs-api-shim-library/src/main/java/org/apache/hadoop/fs/shim/FSDataInputStreamShim.java

parthchandra avatar Jun 09 '22 18:06 parthchandra

perhaps check if the ByteBufferReadable interface is implemented in the stream?

The requirement for the hasCapability("in:readbytebuffer") to return true postdates the API; there's no way to be confident that if the probe returns false (or hasPathCapability() isn't available) that the method isn't actually there

see #951 for a design which will trust a true response, falling back to looking at the wrapped stream. Note that as it calls getWrapped() it is calling methods tagged LimitedPrivate. it should really do that...at the very least hadoop needs a PR saying "we need to do this because..." and that tag can be changed

steveloughran avatar Jun 13 '22 13:06 steveloughran

(i could of course add those probes into the shim class, so at least that access of internals was in one place)

steveloughran avatar Jun 13 '22 13:06 steveloughran