PARQUET-2429: Reduce direct input buffer churn
Addresses https://issues.apache.org/jira/browse/PARQUET-2429.
Currently input buffers are grown one chunk at a time as the compressor or decompressor receives successive setInput calls. When decompressing a 64MB block using a 4KB chunk size, this leads to thousands of allocations and deallocations totaling GBs of memory. By growing the buffer 2x each time, we avoid this and instead use a modest number of allocations.
When decompressing a 64MB block using a 4KB chunk size, this leads to thousands of allocations and deallocations totaling GBs of memory.
Is that a real use case? Usually we don't expect a page to be as large as this.
When decompressing a 64MB block using a 4KB chunk size, this leads to thousands of allocations and deallocations totaling GBs of memory.
Is that a real use case? Usually we don't expect a page to be as large as this.
I did encounter this in the real world, on some Snappy-compressed Parquet files that were written by Spark. I don't have access to the Spark cluster or job info, though, so unfortunately I don't have more details than that.
Could you please make the CI happy?
cc @gszadovszky @shangxinli
@gianm, I agree with @wgtmac's concern about the expected size. For compression/decompression we are targeting the page size. The page size is limited by two configs, parquet.page.size and parquet.page.row.count.limit. (See details here.) One may configure both to higher values but it does not really make sense to have 64M pages.
I would not use a hadoop config for the default size of compression buffers. Hadoop typically compresses whole files. Probably the default page size would be a better choice here.
I like the idea of keeping the last size in the codec so the nexttime you don't need the multiple re-allocations. The catch here might be in the case of writing Parquet files with different page size configurations so we might allocate more than actually required. But I don't think this would be a real-life scenario.
I agree with @wgtmac's concern about the expected size. For compression/decompression we are targeting the page size. The page size is limited by two configs,
parquet.page.sizeandparquet.page.row.count.limit. (See details here.) One may configure both to higher values but it does not really make sense to have 64M pages.
I did encounter these in the real world, although it's always possible that they were built with some abnormally large values for some reason.
I would not use a hadoop config for the default size of compression buffers. Hadoop typically compresses whole files. Probably the default page size would be a better choice here.
I'm ok with doing whichever. FWIW, the setting io.file.buffer.size I used in the most recent patch (which was recommended here: https://github.com/apache/parquet-mr/pull/1270#discussion_r1493591742) defaults to 4096 bytes. I am not really a Parquet expert so I am willing to use whatever y'all recommend. Is there another property that would be better?
@gianm, I haven't question whether you faced such large pages in the real world but whether they make sense. Anyway, our code needs to handle these cases for sure. What I want to avoid is occupying much larger buffers than actually needed.
Page size is managed by ParquetProperties.getPageSizeThreshold(), the default value is ParquetProperties.DEFAULT_PAGE_SIZE.
@gszadovszky I'm trying to switch the codecs to use ParquetProperties#getPageSizeThreshold() as the initial buffer size but am running into some issues with seeing how to structure that. It looks like the various codecs (SnappyCodec, Lz4RawCodec) are stashed in a static final map called CODEC_BY_NAME in CodecFactory. Before they are stashed in the map, they are configured by a Hadoop Configuration object. Presumably that needs to be consistent across the entire classloader, since the configured codecs are getting stashed in a static final map.
I don't see a way to get the relevant ParquetProperties at the time the codecs are created. (I'm also not sure if it even really makes sense; is ParquetProperties something that is consistent across the entire classloader like a Hadoop Configuration would be?)
Any suggestions are welcome. I could also go back to the approach where the initial buffer size isn't configurable, and hard-code it at 4KB or 1MB or what seems most reasonable. With the doubling-every-allocation approach introduced in this patch, it isn't going to be the end of the world if the initial size is too small.
@gszadovszky I'm trying to switch the codecs to use
ParquetProperties#getPageSizeThreshold()as the initial buffer size but am running into some issues with seeing how to structure that. It looks like the various codecs (SnappyCodec,Lz4RawCodec) are stashed in astatic finalmap calledCODEC_BY_NAMEinCodecFactory. Before they are stashed in the map, they are configured by a HadoopConfigurationobject. Presumably that needs to be consistent across the entire classloader, since the configured codecs are getting stashed in astatic finalmap.I don't see a way to get the relevant
ParquetPropertiesat the time the codecs are created. (I'm also not sure if it even really makes sense; isParquetPropertiessomething that is consistent across the entire classloader like a HadoopConfigurationwould be?)Any suggestions are welcome. I could also go back to the approach where the initial buffer size isn't configurable, and hard-code it at 4KB or 1MB or what seems most reasonable. With the doubling-every-allocation approach introduced in this patch, it isn't going to be the end of the world if the initial size is too small.
In this case I wouldn't spend to much time on actually passing the configured value, and as you said, it might not even possible because of the caching. I think, you are right to start with a small size and reach the target quickly.
In this case I wouldn't spend to much time on actually passing the configured value, and as you said, it might not even possible because of the caching. I think, you are right to start with a small size and reach the target quickly.
OK, thanks for the feedback. I have pushed up a change to start with the max of 4KB and the initial chunk passed to setInput, then reach the target through doubling.
Hi- would it be possible to commit this, now that it's approved?
Sorry for the delay. I just merged this. Thanks!
thank you!