Support configuring compression levels for ZSTD writes
Various Trino connectors support writing data with ZSTD compression (for e.g. using hive.compression-codec for Hive connector).
ZSTD also supports compression level (currently from 1 to 22). It would be good to support this functionality in Trino too, so that we can benefit from higher compression levels (reducing storage costs).
This is similar to what Spark allows with spark.sql.parquet.compression.coded.zstd.level.
cc: @raunaqmorarka
Bumped into another issue today - not sure if it will get fixed by supporting this feature request (happy to open another issue, if maintainers advise).
I received Window size too large (not yet supported): offset=5537 error while trying to read data written by Spark using higher zstd compression levels. Notably, the issue happens only if I use level=20 or higher (i.e. Trino is able to read the data with level 1-19).
Full error:
pyhive.exc.DatabaseError: {'message': 'Failed to read Parquet file: s3a://bucketname/akki_prefix/zstd_testing/test_table_1747091374750901_193209409197/dt=2025-01-03/hour=5/part-00000-65bcb83c-9960-46f8-9fae-0c832dde81cb-c000.zstd.parquet', 'errorCode': 16777217, 'errorName': 'HIVE_CURSOR_ERROR', 'errorType': 'EXTERNAL', 'failureInfo': {'type': 'io.trino.spi.TrinoException', 'message': 'Failed to read Parquet file: s3a://bucketname/akki_prefix/zstd_testing/test_table_1747091374750901_193209409197/dt=2025-01-03/hour=5/part-00000-65bcb83c-9960-46f8-9fae-0c832dde81cb-c000.zstd.parquet', 'cause': {'type': 'io.airlift.compress.MalformedInputException', 'message': 'Window size too large (not yet supported): offset=5537', 'suppressed': [], 'stack': ['io.airlift.compress.zstd.Util.verify(Util.java:45)', 'io.airlift.compress.zstd.ZstdFrameDecompressor.decodeCompressedBlock(ZstdFrameDecompressor.java:293)', 'io.airlift.compress.zstd.ZstdFrameDecompressor.decompress(ZstdFrameDecompressor.java:182)', 'io.airlift.compress.zstd.ZstdDecompressor.decompress(ZstdDecompressor.java:39)', 'io.trino.parquet.ParquetCompressionUtils.decompress(ParquetCompressionUtils.java:144)', 'io.trino.parquet.ParquetCompressionUtils.decompressZstd(ParquetCompressionUtils.java:82)', 'io.trino.parquet.ParquetCompressionUtils.decompress(ParquetCompressionUtils.java:64)', 'io.trino.parquet.reader.PageReader.readPage(PageReader.java:112)', 'io.trino.parquet.reader.PrimitiveColumnReader.readNextPage(PrimitiveColumnReader.java:275)', 'io.trino.parquet.reader.PrimitiveColumnReader.readPrimitive(PrimitiveColumnReader.java:145)', 'io.trino.parquet.reader.ParquetReader.readPrimitive(ParquetReader.java:444)', 'io.trino.parquet.reader.ParquetReader.readColumnChunk(ParquetReader.java:529)', 'io.trino.parquet.reader.ParquetReader.readBlock(ParquetReader.java:512)', 'io.trino.parquet.reader.ParquetReader.lambda$nextPage$3(ParquetReader.java:274)', 'io.trino.parquet.reader.ParquetBlockFactory$ParquetBlockLoader.load(ParquetBlockFactory.java:72)', 'io.trino.spi.block.LazyBlock$LazyData.load(LazyBlock.java:407)', 'io.trino.spi.block.LazyBlock$LazyData.getFullyLoadedBlock(LazyBlock.java:386)', 'io.trino.spi.block.LazyBlock.getLoadedBlock(LazyBlock.java:293)', 'io.trino.operator.project.DictionaryAwarePageProjection$DictionaryAwarePageProjectionWork.setupDictionaryBlockProjection(DictionaryAwarePageProjection.java:208)', 'io.trino.operator.project.DictionaryAwarePageProjection$DictionaryAwarePageProjectionWork.lambda$getResult$0(DictionaryAwarePageProjection.java:196)', 'io.trino.spi.block.LazyBlock$LazyData.load(LazyBlock.java:407)', 'io.trino.spi.block.LazyBlock$LazyData.getFullyLoadedBlock(LazyBlock.java:386)', 'io.trino.spi.block.LazyBlock.getLoadedBlock(LazyBlock.java:293)', 'io.trino.operator.project.PageProcessor$ProjectSelectedPositions.processBatch(PageProcessor.java:361)', 'io.trino.operator.project.PageProcessor$ProjectSelectedPositions.process(PageProcessor.java:220)', 'io.trino.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:413)', 'io.trino.operator.WorkProcessorUtils.lambda$flatten$6(WorkProcessorUtils.java:318)', 'io.trino.operator.WorkProcessorUtils$3.process(WorkProcessorUtils.java:360)', 'io.trino.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:413)', 'io.trino.operator.WorkProcessorUtils$3.process(WorkProcessorUtils.java:347)', 'io.trino.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:413)', 'io.trino.operator.WorkProcessorUtils.getNextState(WorkProcessorUtils.java:262)', 'io.trino.operator.WorkProcessorUtils$BlockingProcess.process(WorkProcessorUtils.java:208)', 'io.trino.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:413)', 'io.trino.operator.WorkProcessorUtils.lambda$flatten$6(WorkProcessorUtils.java:318)', 'io.trino.operator.WorkProcessorUtils$3.process(WorkProcessorUtils.java:360)', 'io.trino.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:413)', 'io.trino.operator.WorkProcessorUtils$3.process(WorkProcessorUtils.java:347)', 'io.trino.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:413)', 'io.trino.operator.WorkProcessorUtils.getNextState(WorkProcessorUtils.java:262)', 'io.trino.operator.WorkProcessorUtils.lambda$processStateMonitor$2(WorkProcessorUtils.java:241)', 'io.trino.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:413)', 'io.trino.operator.WorkProcessorUtils.getNextState(WorkProcessorUtils.java:262)', 'io.trino.operator.WorkProcessorUtils.lambda$finishWhen$3(WorkProcessorUtils.java:256)', 'io.trino.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:413)', 'io.trino.operator.WorkProcessorSourceOperatorAdapter.getOutput(WorkProcessorSourceOperatorAdapter.java:146)', 'io.trino.operator.Driver.processInternal(Driver.java:394)', 'io.trino.operator.Driver.lambda$process$8(Driver.java:297)', 'io.trino.operator.Driver.tryWithLock(Driver.java:689)', 'io.trino.operator.Driver.process(Driver.java:289)', 'io.trino.operator.Driver.processForDuration(Driver.java:260)', 'io.trino.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:755)', 'io.trino.execution.executor.PrioritizedSplitRunner.process(PrioritizedSplitRunner.java:165)', 'io.trino.execution.executor.TaskExecutor$TaskRunner.run(TaskExecutor.java:523)', 'io.trino.$gen.Trino_406____20250512_151520_2.run(Unknown Source)', 'java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)', 'java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)', 'java.base/java.lang.Thread.run(Thread.java:1575)']}, 'suppressed': [], 'stack': ['io.trino.plugin.hive.parquet.ParquetPageSource.handleException(ParquetPageSource.java:169)', 'io.trino.plugin.hive.parquet.ParquetPageSourceFactory.lambda$createPageSource$2(ParquetPageSourceFactory.java:295)', 'io.trino.parquet.reader.ParquetBlockFactory$ParquetBlockLoader.load(ParquetBlockFactory.java:75)', 'io.trino.spi.block.LazyBlock$LazyData.load(LazyBlock.java:407)', 'io.trino.spi.block.LazyBlock$LazyData.getFullyLoadedBlock(LazyBlock.java:386)', 'io.trino.spi.block.LazyBlock.getLoadedBlock(LazyBlock.java:293)', 'io.trino.operator.project.DictionaryAwarePageProjection$DictionaryAwarePageProjectionWork.setupDictionaryBlockProjection(DictionaryAwarePageProjection.java:208)', 'io.trino.operator.project.DictionaryAwarePageProjection$DictionaryAwarePageProjectionWork.lambda$getResult$0(DictionaryAwarePageProjection.java:196)', 'io.trino.spi.block.LazyBlock$LazyData.load(LazyBlock.java:407)', 'io.trino.spi.block.LazyBlock$LazyData.getFullyLoadedBlock(LazyBlock.java:386)', 'io.trino.spi.block.LazyBlock.getLoadedBlock(LazyBlock.java:293)', 'io.trino.operator.project.PageProcessor$ProjectSelectedPositions.processBatch(PageProcessor.java:361)', 'io.trino.operator.project.PageProcessor$ProjectSelectedPositions.process(PageProcessor.java:220)', 'io.trino.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:413)', 'io.trino.operator.WorkProcessorUtils.lambda$flatten$6(WorkProcessorUtils.java:318)', 'io.trino.operator.WorkProcessorUtils$3.process(WorkProcessorUtils.java:360)', 'io.trino.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:413)', 'io.trino.operator.WorkProcessorUtils$3.process(WorkProcessorUtils.java:347)', 'io.trino.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:413)', 'io.trino.operator.WorkProcessorUtils.getNextState(WorkProcessorUtils.java:262)', 'io.trino.operator.WorkProcessorUtils$BlockingProcess.process(WorkProcessorUtils.java:208)', 'io.trino.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:413)', 'io.trino.operator.WorkProcessorUtils.lambda$flatten$6(WorkProcessorUtils.java:318)', 'io.trino.operator.WorkProcessorUtils$3.process(WorkProcessorUtils.java:360)', 'io.trino.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:413)', 'io.trino.operator.WorkProcessorUtils$3.process(WorkProcessorUtils.java:347)', 'io.trino.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:413)', 'io.trino.operator.WorkProcessorUtils.getNextState(WorkProcessorUtils.java:262)', 'io.trino.operator.WorkProcessorUtils.lambda$processStateMonitor$2(WorkProcessorUtils.java:241)', 'io.trino.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:413)', 'io.trino.operator.WorkProcessorUtils.getNextState(WorkProcessorUtils.java:262)', 'io.trino.operator.WorkProcessorUtils.lambda$finishWhen$3(WorkProcessorUtils.java:256)', 'io.trino.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:413)', 'io.trino.operator.WorkProcessorSourceOperatorAdapter.getOutput(WorkProcessorSourceOperatorAdapter.java:146)', 'io.trino.operator.Driver.processInternal(Driver.java:394)', 'io.trino.operator.Driver.lambda$process$8(Driver.java:297)', 'io.trino.operator.Driver.tryWithLock(Driver.java:689)', 'io.trino.operator.Driver.process(Driver.java:289)', 'io.trino.operator.Driver.processForDuration(Driver.java:260)', 'io.trino.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:755)', 'io.trino.execution.executor.PrioritizedSplitRunner.process(PrioritizedSplitRunner.java:165)', 'io.trino.execution.executor.TaskExecutor$TaskRunner.run(TaskExecutor.java:523)', 'io.trino.$gen.Trino_406____20250512_151520_2.run(Unknown Source)', 'java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)', 'java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)', 'java.base/java.lang.Thread.run(Thread.java:1575)']}}
@akki Trino's writer uses the default ZSTD compression level of 3. So the above problem isn't going to be affected by adding configureability to Trino. In fact, allowing users to set higher compression level is going to only increase chances of hitting Window size too large if higher compression level is actually the root cause for that.
You should try using a more recent version of Trino, we've started using native zstd decompression now and that may not have the same issue.
cc: @wendigo
+1
Hi @raunaqmorarka may be to support ZSTD compression level of 1 it provide nice balance of compression and speed? there is also interesting https://github.com/airlift/aircompressor/issues/174
Hi @raunaqmorarka may be to support ZSTD compression level of 1 it provide nice balance of compression and speed? there is also interesting airlift/aircompressor#174
We'd need to understand the use case and do comparisions with SNAPPY and LZ4 to justify that. So far, LZ4 has worked well for use cases which prioritize compression speed, ZSTD for use cases which prioritize compression ratio and SNAPPY for something in between.
hm seams to be you are right, there are enough options at this moment @raunaqmorarka thanks for your attention 🙏🏻
Following this table lz4 is a best option for speed, zstd 1.5.7 --fast=3 a bit better than snappy but quite same rates. (https://facebook.github.io/zstd/)
Just to pay a bit your attention on
LZ4 deprecated, new will be lz4_raw
https://parquet.apache.org/docs/file-format/data-pages/compression/
https://issues.apache.org/jira/plugins/servlet/mobile#issue/PARQUET-2032
Perhaps better strategy to stay on zstd for a while.
And point of adding zstd levels could introduce new sense at this moment.
Cc:@raunaqmorarka