spark icon indicating copy to clipboard operation
spark copied to clipboard

[SPARK-52482][SQL][CORE] ZStandard support for file data source reader

Open sandip-db opened this issue 6 months ago • 1 comments

What changes were proposed in this pull request?

The current Hadoop version in Spark hasn’t been compiled with Zstd because of which ZSTD compressed json, csv, txt and xml files cannot be read. Spark has a built-in ZStdCompressionCodec, which is used for compressing shuffled data and other use cases.

This PR adds support for ZStandard decompression in file data source readers using the built-in ZStdCompressionCodec. It also add support for some non-standard extensions like .gzip and .zstd.

The first commit in this PR is a vanilla copy of Hadoop's LineRecordReader. This was needed to add support for non-Hadoop based codec. Reviewers need not review this commit.

Following config have been added to enable/disable changes in this PR:

  • spark.sql.execution.datasources.hadoopLineRecordReader.enabled: Setting this SQLConf to false will result in using Hadoop's LineRecordReader. Otherwise, use the inlined HadoopLineRecordReader by default.
  • spark.io.zStandard.enabled: Enable Spark's ZStdCompressionCodec for file data source reader. Default: true

Why are the changes needed?

Same as above.

Does this PR introduce any user-facing change?

Yes, Adds support for ZSTD decompression in file data source readers.

How was this patch tested?

Added new unit tests

Was this patch authored or co-authored using generative AI tooling?

No

sandip-db avatar Jun 15 '25 22:06 sandip-db

Hadoop has built-in org.apache.hadoop.io.compress.ZStandardCodec, but it requires compilation with the native library, I think the right direction is to migrate it to zstd-jni, like other codecs, see HADOOP-17292 (lz4), HADOOP-17125 (snappy), HADOOP-17825 (gzip).

Even if you don't want to touch the Hadoop code, this PR approach looks too overkill, Hadoop provides io.compression.codecs.CompressionCodec to allow implementing custom codecs, implementing a org.apache.spark.xxx.SparkZstdCompressionCodec and configuring io.compression.codecs should work.

pan3793 avatar Jun 16 '25 01:06 pan3793

Even if you don't want to touch the Hadoop code, this PR approach looks too overkill, Hadoop provides io.compression.codecs.CompressionCodec to allow implementing custom codecs, implementing a org.apache.spark.xxx.SparkZstdCompressionCodec and configuring io.compression.codecs should work.

@pan3793 Thanks for your comment. The PR adds less than 50 lines of code to re-use Spark's ZStdCompressionCodec for file data source. The inlining of Hadoop's LineRecordReader would be needed regardless of ZSTD support because of a follow-up change that I am working on, which would allow users to specify compression type using file data source option. There are users who have files with either non-standard extensions or no extensions at all. Hadoop's way of determining codec based on file name extension is forcing them to rename their files.

Implementing CompressionCodec and Decompressor interfaces for adding ZStdCompressionCodec support seems unnecessary at this point. And it will not address the extension issues pointed above. At some point, I expect Spark will upgrade to Hadoop with native ZSTD support compiled and then the code will start to use that automatically instead of Spark's ZStdCompressionCodec.

sandip-db avatar Jun 17 '25 01:06 sandip-db

The inlining of Hadoop's LineRecordReader would be needed regardless of ZSTD support because of a follow-up change that I am working on, which would add allow users to specify compression type using file data source option.

@pan3793 does Hadoop's LineRecordReader allow us to specify the compression at the session level without forking the code?

cloud-fan avatar Jun 18 '25 00:06 cloud-fan

does Hadoop's LineRecordReader allow us to specify the compression at the session level without forking the code?

@cloud-fan Its possible to pass different codecs via io.compression.codecs.CompressionCodec Hadoop conf, but Hadoop will still use file name extension to choose codec from that list. Users' files may not have a standard extension to begin with.

sandip-db avatar Jun 18 '25 00:06 sandip-db

does Hadoop's LineRecordReader allow us to specify the compression at the session level without forking the code?

@cloud-fan as @sandip-db said, Hadoop's LineRecordReader relies on the filename suffix to choose the decompressor, I think this is a well-known behavior for processing text files, because unlike binary files (e.g. Parquet/ORC) which has metadata in footer to mark the codec used by each page/chunk inside binary file, compression is applied to the whole text file.

how do you define the behavior of "specify the compression at the session level"? always respect session conf and ignore filename suffix? or fallback to use codec suggested by session conf when something goes wrong?

also, please be careful with that Hadoop codec may have different behavior with Spark/Unix tool codec, for example, HADOOP-12990(lz4)

pan3793 avatar Jun 18 '25 06:06 pan3793

There are users who have files with either non-standard extensions or no extensions at all.

@sandip-db to handle "non-standard extensions", you just need to register another Hadoop codec, for example

new org.apache.hadoop.io.compress.GzipCodec {
  override def getDefaultExtension: String = "gzip"
}

for "no extensions" compressed text files, I'm not sure if this is a valid use case(see my last comment)

pan3793 avatar Jun 18 '25 07:06 pan3793

for "no extensions" compressed text files, I'm not sure if this is a valid use case(see my last comment)

@pan3793 While uncommon, we come across users who have compressed files without any extension. Most recently, an user had to rename a billion gzipped compressed files in S3 because it didn't have any extension.

sandip-db avatar Jun 18 '25 14:06 sandip-db

also, please be careful with that Hadoop codec may have different behavior with Spark/Unix tool codec, for example, HADOOP-12990(lz4)

Thanks for bringing this to my attention. We are not adding support for ZSTD compression using Spark codec yet. This PR just adds decompression support.

@cloud-fan @pan3793 Do you have any further concern with this PR. Can this be merged?

sandip-db avatar Jun 18 '25 14:06 sandip-db

@sandip-db TBH, the current approach (control flow is based on try-catch exception) seems too hacky, and I'd like to see more detailed designs of your next steps. (I don't think the below question got answered.)

how do you define the behavior of "specify the compression at the session level"? always respect session conf and ignore filename suffix? or fallback to use codec suggested by session conf when something goes wrong?

This PR just adds decompression support.

You still need to ensure that Spark's zstd codec is compatible with Hadoop's implementation. I have experienced using the AirCompressor LZO codec to decompress the files written via hadoop-lzo may randomly get corrupt content with no errors.

pan3793 avatar Jun 19 '25 05:06 pan3793

Instead of relying on the filename suffix or Spark session conf to choose the codec, I wonder if the Magic Number was considered? For example, the file command can correctly recognize the file codec even without standard filename extension

$ file unit-tests.log
unit-tests.log: ASCII text, with very long lines (388)
$ file unit-tests.log.gz
unit-tests.log.gz: gzip compressed data, was "unit-tests.log", last modified: Mon Apr 21 13:03:04 2025, from Unix, original size modulo 2^32 11024393
$ file unit-tests.log.zst
unit-tests.log.zst: Zstandard compressed data (v0.8+), Dictionary ID: None
$ cp unit-tests.log.gz unit-tests.log.gz.foo
$ file unit-tests.log.gz.foo
unit-tests.log.gz.foo: gzip compressed data, was "unit-tests.log", last modified: Mon Apr 21 13:03:04 2025, from Unix, original size modulo 2^32 11024393
$ cp unit-tests.log.zst unit-tests.log.zst.bar
$ file unit-tests.log.zst.bar
unit-tests.log.zst.bar: Zstandard compressed data (v0.8+), Dictionary ID: None

pan3793 avatar Jun 19 '25 05:06 pan3793

@pan3793 Thanks for your input.

how do you define the behavior of "specify the compression at the session level"? always respect session conf and ignore filename suffix? or fallback to use codec suggested by session conf when something goes wrong?

User should be able to specify the compression type in their query using data source reader option. For example: spark.read.option("compression", "gzip").json(path)

If this option is specified, Spark will always use the compression type specified by the user. There are some other alternatives available like first use the file path extension or use the magic number (used by the file utility) to determine the codec type.

Instead of relying on the filename suffix or Spark session conf to choose the codec, I wonder if the Magic Number was considered? For example, the file command can correctly recognize the file codec even without standard filename extension

I agree and I wonder why Hadoop didn't do this in the first place. There will be some performance penalty for examining the magic number and reopening the file input stream with appropriate codec.

Having said that, we can take the discussion of the compression option and the use of magic number to my next PR.

The current PR is about adding ZSTD decompression support. I would appreciate if we can close this first.

The try-catch logic has been used to pick codec in the following order:

  • User provided ZSTD codec via io.compression.codecs.CompressionCodec Hadoop Conf
  • Hadoop's default ZSTD codec
  • Spark's ZStdCompressionCodec.

For compatibility check, I have added some more tests that reads files compressed with ZSTD in ubuntu (version: 1.4.4+dfsg-3ubuntu0.1).

sandip-db avatar Jun 19 '25 06:06 sandip-db

OK, it's fair enough to fork Hadoop's LineRecordReader and use try-catch logic for codec fallback given your flexible design.

There will be some performance penalty for examining the magic number and reopening the file input stream with appropriate codec.

Having a wrapper Codec to look ahead Magic Number and transfer the InputStream to the concrete Codec should eliminate the re-open cost, anyway, this is about implementation details and can be discussed later.

For compatibility check, I have added some more tests that reads files compressed with ZSTD in ubuntu (version: 1.4.4+dfsg-3ubuntu0.1).

I think you should at least test reading zstd text file written by Hadoop

pan3793 avatar Jun 19 '25 07:06 pan3793

I think you should at least test reading zstd text file written by Hadoop

Added a test scenario with a file compressed using Hadoop native ZSTD codec.

sandip-db avatar Jun 19 '25 09:06 sandip-db

thanks, merging to master!

cloud-fan avatar Jun 23 '25 02:06 cloud-fan

@sandip-db I just found an incompatible issue between zstd-jni and Hadoop native zstd reported by the HBase community. HBASE-27706, not sure if it will affect Spark cases.

pan3793 avatar Jul 02 '25 05:07 pan3793

@pan3793 Thanks for pointing it out. If native hadoop or user provided zstd codec is available, we will use that instead of Spark's zstd-JNI based decompression.

sandip-db avatar Jul 02 '25 06:07 sandip-db

@sandip-db I know that, but since this is related to storage, we should be careful if there are incompatible possibilities

pan3793 avatar Jul 02 '25 06:07 pan3793

@sandip-db is there any plan for future work to allow for this read pattern?

User should be able to specify the compression type in their query using data source reader option. For example: spark.read.option("compression", "gzip").json(path)

I'm not seeing any other currently supported way to achieve such a result.

rcalme avatar Sep 17 '25 23:09 rcalme