trino icon indicating copy to clipboard operation
trino copied to clipboard

Support configuring compression levels for ZSTD writes

Open akki opened this issue 1 year ago • 4 comments

Various Trino connectors support writing data with ZSTD compression (for e.g. using hive.compression-codec for Hive connector). ZSTD also supports compression level (currently from 1 to 22). It would be good to support this functionality in Trino too, so that we can benefit from higher compression levels (reducing storage costs).

This is similar to what Spark allows with spark.sql.parquet.compression.coded.zstd.level.

akki avatar Feb 25 '25 20:02 akki

cc: @raunaqmorarka

ebyhr avatar Feb 25 '25 22:02 ebyhr

Bumped into another issue today - not sure if it will get fixed by supporting this feature request (happy to open another issue, if maintainers advise).

I received Window size too large (not yet supported): offset=5537 error while trying to read data written by Spark using higher zstd compression levels. Notably, the issue happens only if I use level=20 or higher (i.e. Trino is able to read the data with level 1-19).

Full error:

pyhive.exc.DatabaseError: {'message': 'Failed to read Parquet file: s3a://bucketname/akki_prefix/zstd_testing/test_table_1747091374750901_193209409197/dt=2025-01-03/hour=5/part-00000-65bcb83c-9960-46f8-9fae-0c832dde81cb-c000.zstd.parquet', 'errorCode': 16777217, 'errorName': 'HIVE_CURSOR_ERROR', 'errorType': 'EXTERNAL', 'failureInfo': {'type': 'io.trino.spi.TrinoException', 'message': 'Failed to read Parquet file: s3a://bucketname/akki_prefix/zstd_testing/test_table_1747091374750901_193209409197/dt=2025-01-03/hour=5/part-00000-65bcb83c-9960-46f8-9fae-0c832dde81cb-c000.zstd.parquet', 'cause': {'type': 'io.airlift.compress.MalformedInputException', 'message': 'Window size too large (not yet supported): offset=5537', 'suppressed': [], 'stack': ['io.airlift.compress.zstd.Util.verify(Util.java:45)', 'io.airlift.compress.zstd.ZstdFrameDecompressor.decodeCompressedBlock(ZstdFrameDecompressor.java:293)', 'io.airlift.compress.zstd.ZstdFrameDecompressor.decompress(ZstdFrameDecompressor.java:182)', 'io.airlift.compress.zstd.ZstdDecompressor.decompress(ZstdDecompressor.java:39)', 'io.trino.parquet.ParquetCompressionUtils.decompress(ParquetCompressionUtils.java:144)', 'io.trino.parquet.ParquetCompressionUtils.decompressZstd(ParquetCompressionUtils.java:82)', 'io.trino.parquet.ParquetCompressionUtils.decompress(ParquetCompressionUtils.java:64)', 'io.trino.parquet.reader.PageReader.readPage(PageReader.java:112)', 'io.trino.parquet.reader.PrimitiveColumnReader.readNextPage(PrimitiveColumnReader.java:275)', 'io.trino.parquet.reader.PrimitiveColumnReader.readPrimitive(PrimitiveColumnReader.java:145)', 'io.trino.parquet.reader.ParquetReader.readPrimitive(ParquetReader.java:444)', 'io.trino.parquet.reader.ParquetReader.readColumnChunk(ParquetReader.java:529)', 'io.trino.parquet.reader.ParquetReader.readBlock(ParquetReader.java:512)', 'io.trino.parquet.reader.ParquetReader.lambda$nextPage$3(ParquetReader.java:274)', 'io.trino.parquet.reader.ParquetBlockFactory$ParquetBlockLoader.load(ParquetBlockFactory.java:72)', 'io.trino.spi.block.LazyBlock$LazyData.load(LazyBlock.java:407)', 'io.trino.spi.block.LazyBlock$LazyData.getFullyLoadedBlock(LazyBlock.java:386)', 'io.trino.spi.block.LazyBlock.getLoadedBlock(LazyBlock.java:293)', 'io.trino.operator.project.DictionaryAwarePageProjection$DictionaryAwarePageProjectionWork.setupDictionaryBlockProjection(DictionaryAwarePageProjection.java:208)', 'io.trino.operator.project.DictionaryAwarePageProjection$DictionaryAwarePageProjectionWork.lambda$getResult$0(DictionaryAwarePageProjection.java:196)', 'io.trino.spi.block.LazyBlock$LazyData.load(LazyBlock.java:407)', 'io.trino.spi.block.LazyBlock$LazyData.getFullyLoadedBlock(LazyBlock.java:386)', 'io.trino.spi.block.LazyBlock.getLoadedBlock(LazyBlock.java:293)', 'io.trino.operator.project.PageProcessor$ProjectSelectedPositions.processBatch(PageProcessor.java:361)', 'io.trino.operator.project.PageProcessor$ProjectSelectedPositions.process(PageProcessor.java:220)', 'io.trino.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:413)', 'io.trino.operator.WorkProcessorUtils.lambda$flatten$6(WorkProcessorUtils.java:318)', 'io.trino.operator.WorkProcessorUtils$3.process(WorkProcessorUtils.java:360)', 'io.trino.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:413)', 'io.trino.operator.WorkProcessorUtils$3.process(WorkProcessorUtils.java:347)', 'io.trino.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:413)', 'io.trino.operator.WorkProcessorUtils.getNextState(WorkProcessorUtils.java:262)', 'io.trino.operator.WorkProcessorUtils$BlockingProcess.process(WorkProcessorUtils.java:208)', 'io.trino.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:413)', 'io.trino.operator.WorkProcessorUtils.lambda$flatten$6(WorkProcessorUtils.java:318)', 'io.trino.operator.WorkProcessorUtils$3.process(WorkProcessorUtils.java:360)', 'io.trino.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:413)', 'io.trino.operator.WorkProcessorUtils$3.process(WorkProcessorUtils.java:347)', 'io.trino.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:413)', 'io.trino.operator.WorkProcessorUtils.getNextState(WorkProcessorUtils.java:262)', 'io.trino.operator.WorkProcessorUtils.lambda$processStateMonitor$2(WorkProcessorUtils.java:241)', 'io.trino.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:413)', 'io.trino.operator.WorkProcessorUtils.getNextState(WorkProcessorUtils.java:262)', 'io.trino.operator.WorkProcessorUtils.lambda$finishWhen$3(WorkProcessorUtils.java:256)', 'io.trino.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:413)', 'io.trino.operator.WorkProcessorSourceOperatorAdapter.getOutput(WorkProcessorSourceOperatorAdapter.java:146)', 'io.trino.operator.Driver.processInternal(Driver.java:394)', 'io.trino.operator.Driver.lambda$process$8(Driver.java:297)', 'io.trino.operator.Driver.tryWithLock(Driver.java:689)', 'io.trino.operator.Driver.process(Driver.java:289)', 'io.trino.operator.Driver.processForDuration(Driver.java:260)', 'io.trino.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:755)', 'io.trino.execution.executor.PrioritizedSplitRunner.process(PrioritizedSplitRunner.java:165)', 'io.trino.execution.executor.TaskExecutor$TaskRunner.run(TaskExecutor.java:523)', 'io.trino.$gen.Trino_406____20250512_151520_2.run(Unknown Source)', 'java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)', 'java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)', 'java.base/java.lang.Thread.run(Thread.java:1575)']}, 'suppressed': [], 'stack': ['io.trino.plugin.hive.parquet.ParquetPageSource.handleException(ParquetPageSource.java:169)', 'io.trino.plugin.hive.parquet.ParquetPageSourceFactory.lambda$createPageSource$2(ParquetPageSourceFactory.java:295)', 'io.trino.parquet.reader.ParquetBlockFactory$ParquetBlockLoader.load(ParquetBlockFactory.java:75)', 'io.trino.spi.block.LazyBlock$LazyData.load(LazyBlock.java:407)', 'io.trino.spi.block.LazyBlock$LazyData.getFullyLoadedBlock(LazyBlock.java:386)', 'io.trino.spi.block.LazyBlock.getLoadedBlock(LazyBlock.java:293)', 'io.trino.operator.project.DictionaryAwarePageProjection$DictionaryAwarePageProjectionWork.setupDictionaryBlockProjection(DictionaryAwarePageProjection.java:208)', 'io.trino.operator.project.DictionaryAwarePageProjection$DictionaryAwarePageProjectionWork.lambda$getResult$0(DictionaryAwarePageProjection.java:196)', 'io.trino.spi.block.LazyBlock$LazyData.load(LazyBlock.java:407)', 'io.trino.spi.block.LazyBlock$LazyData.getFullyLoadedBlock(LazyBlock.java:386)', 'io.trino.spi.block.LazyBlock.getLoadedBlock(LazyBlock.java:293)', 'io.trino.operator.project.PageProcessor$ProjectSelectedPositions.processBatch(PageProcessor.java:361)', 'io.trino.operator.project.PageProcessor$ProjectSelectedPositions.process(PageProcessor.java:220)', 'io.trino.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:413)', 'io.trino.operator.WorkProcessorUtils.lambda$flatten$6(WorkProcessorUtils.java:318)', 'io.trino.operator.WorkProcessorUtils$3.process(WorkProcessorUtils.java:360)', 'io.trino.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:413)', 'io.trino.operator.WorkProcessorUtils$3.process(WorkProcessorUtils.java:347)', 'io.trino.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:413)', 'io.trino.operator.WorkProcessorUtils.getNextState(WorkProcessorUtils.java:262)', 'io.trino.operator.WorkProcessorUtils$BlockingProcess.process(WorkProcessorUtils.java:208)', 'io.trino.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:413)', 'io.trino.operator.WorkProcessorUtils.lambda$flatten$6(WorkProcessorUtils.java:318)', 'io.trino.operator.WorkProcessorUtils$3.process(WorkProcessorUtils.java:360)', 'io.trino.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:413)', 'io.trino.operator.WorkProcessorUtils$3.process(WorkProcessorUtils.java:347)', 'io.trino.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:413)', 'io.trino.operator.WorkProcessorUtils.getNextState(WorkProcessorUtils.java:262)', 'io.trino.operator.WorkProcessorUtils.lambda$processStateMonitor$2(WorkProcessorUtils.java:241)', 'io.trino.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:413)', 'io.trino.operator.WorkProcessorUtils.getNextState(WorkProcessorUtils.java:262)', 'io.trino.operator.WorkProcessorUtils.lambda$finishWhen$3(WorkProcessorUtils.java:256)', 'io.trino.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:413)', 'io.trino.operator.WorkProcessorSourceOperatorAdapter.getOutput(WorkProcessorSourceOperatorAdapter.java:146)', 'io.trino.operator.Driver.processInternal(Driver.java:394)', 'io.trino.operator.Driver.lambda$process$8(Driver.java:297)', 'io.trino.operator.Driver.tryWithLock(Driver.java:689)', 'io.trino.operator.Driver.process(Driver.java:289)', 'io.trino.operator.Driver.processForDuration(Driver.java:260)', 'io.trino.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:755)', 'io.trino.execution.executor.PrioritizedSplitRunner.process(PrioritizedSplitRunner.java:165)', 'io.trino.execution.executor.TaskExecutor$TaskRunner.run(TaskExecutor.java:523)', 'io.trino.$gen.Trino_406____20250512_151520_2.run(Unknown Source)', 'java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)', 'java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)', 'java.base/java.lang.Thread.run(Thread.java:1575)']}}

akki avatar May 13 '25 00:05 akki

@akki Trino's writer uses the default ZSTD compression level of 3. So the above problem isn't going to be affected by adding configureability to Trino. In fact, allowing users to set higher compression level is going to only increase chances of hitting Window size too large if higher compression level is actually the root cause for that. You should try using a more recent version of Trino, we've started using native zstd decompression now and that may not have the same issue. cc: @wendigo

raunaqmorarka avatar May 20 '25 07:05 raunaqmorarka

+1

YuriyGavrilov avatar Jun 05 '25 20:06 YuriyGavrilov

Hi @raunaqmorarka may be to support ZSTD compression level of 1 it provide nice balance of compression and speed? there is also interesting https://github.com/airlift/aircompressor/issues/174

YuriyGavrilov avatar Jul 06 '25 17:07 YuriyGavrilov

Hi @raunaqmorarka may be to support ZSTD compression level of 1 it provide nice balance of compression and speed? there is also interesting airlift/aircompressor#174

We'd need to understand the use case and do comparisions with SNAPPY and LZ4 to justify that. So far, LZ4 has worked well for use cases which prioritize compression speed, ZSTD for use cases which prioritize compression ratio and SNAPPY for something in between.

raunaqmorarka avatar Jul 06 '25 18:07 raunaqmorarka

hm seams to be you are right, there are enough options at this moment @raunaqmorarka thanks for your attention 🙏🏻

Following this table lz4 is a best option for speed, zstd 1.5.7 --fast=3 a bit better than snappy but quite same rates. (https://facebook.github.io/zstd/)

Image Image

YuriyGavrilov avatar Jul 06 '25 20:07 YuriyGavrilov

Just to pay a bit your attention on

LZ4 deprecated, new will be lz4_raw

https://parquet.apache.org/docs/file-format/data-pages/compression/

https://issues.apache.org/jira/plugins/servlet/mobile#issue/PARQUET-2032

Perhaps better strategy to stay on zstd for a while.

And point of adding zstd levels could introduce new sense at this moment.

Cc:@raunaqmorarka

YuriyGavrilov avatar Jul 07 '25 13:07 YuriyGavrilov