goavro icon indicating copy to clipboard operation
goavro copied to clipboard

Zstd and bzip2 compression support

Open boiler opened this issue 4 years ago • 5 comments

tested with avro-tools 1.9.2

$ avro-tools getmeta event.zstd.avro
avro.schema
          {
                "type" : "record",
                "name" : "Event",
                "fields" : [ {
                  "name" : "body",
                  "type" : "bytes"
                } ]
          }

avro.codec      zstandard
$ avro-tools getmeta event.bz2.avro
avro.schema
          {
                "type" : "record",
                "name" : "Event",
                "fields" : [ {
                  "name" : "body",
                  "type" : "bytes"
                } ]
          }

avro.codec      bzip2

boiler avatar Mar 23 '20 22:03 boiler

I just took a peek at the LICENSE files for the two added libraries:

goavro: Released initially under the Apache License (version 2.0) Google Snappy: Released under BSD 3-Clause License dsnet/compress: Released under BSD 3-Clause License klauspost/compress: Released under BSD 3-Clause License

karrick avatar Aug 17 '20 17:08 karrick

Having a few moments to think about the library dependency issue a bit more, I think I have an idea to eliminate any potential library license concerns, while making it more easy to use other compression libraries in the future without requiring manually adding them.

I am going to ask for a tiny bit of patience for this PR, while I add a pluggable compression algorithm feature for processing OCF files. The goal is to make it so a program will call a function to register a compression codec with goavro before trying to use that particular compression algorithm.

karrick avatar Aug 17 '20 17:08 karrick

@boiler, Thank you for your patience. I have updated a private branch of code that allows easily adding in new compression methods. It presently looks like this, and I think it's fairly simple to use. However, it has a caveat that concerns me. Adding new compression codecs is an action that mutates global state. This might not be an issue for most, but it could potentially become an issue for users in the future.

Because of this caveat, I am looking at a modification to the method of registering new compression codecs for OCF blocks, and while it works, I am not sure I like the API.

Just keeping you in the loop.

const (
	// CompressionSnappyLabel is used when OCF blocks are compressed using the
	// snappy algorithm.
	CompressionSnappyLabel = "snappy"
)

func init() {
	// NOTE: This registration of a compression algorithm serves as an example
	// for future compression algorithm additions. However, there is no reason
	// to make the compression algorithm name publically available. The various
	// compression name labels, including the CompressionSnappyLabel constant
	// here, remain publically available for backwards compatibility only. All
	// new compression algorithms should be defined without necessarily creating
	// a new string constant of their algorithm label.
	_ = registerCompression(CompressionSnappyLabel, ocfCompressSnappy, ocfExpandSnappy)
}

// ocfCompressSnappy compresses the expanded byte slice, returning either the
// compressed byte slice, or a non-nil error.
//
// "Each compressed block is followed by the 4-byte, big-endian CRC32 checksum
// of the uncompressed data in the block."
func ocfCompressSnappy(expanded []byte) ([]byte, error) {
	compressed := snappy.Encode(nil, expanded)
	// OCF requires snappy to have CRC32 checksum after each snappy block
	compressed = append(compressed, 0, 0, 0, 0)                                              // expand slice by 4 bytes so checksum will fit
	binary.BigEndian.PutUint32(compressed[len(compressed)-4:], crc32.ChecksumIEEE(expanded)) // checksum of expanded block
	return compressed, nil
}

// ocfExpandSnappy decompresses the compressed byte slice returning either the
// expanded byte slice, or a non-nil error.
//
// "Each compressed block is followed by the 4-byte, big-endian CRC32 checksum
// of the uncompressed data in the block."
func ocfExpandSnappy(compressed []byte) ([]byte, error) {
	index := len(compressed) - 4 // last 4 bytes is crc32 of decoded block
	if index <= 0 {
		return nil, fmt.Errorf("not enough bytes for CRC32 checksum: %d", len(compressed))
	}
	expanded, err := snappy.Decode(nil, compressed[:index])
	if err != nil {
		return nil, err
	}
	actualCRC := crc32.ChecksumIEEE(expanded)
	expectedCRC := binary.BigEndian.Uint32(compressed[index : index+4])
	if actualCRC != expectedCRC {
		return nil, fmt.Errorf("CRC32 checksum mismatch: %x != %x", actualCRC, expectedCRC)
	}
	return expanded, nil
}

karrick avatar Aug 19 '20 14:08 karrick

I forgot to mention, the above OCF block compression algorithm registration method has another minor downside. It does not allow for more performant buffer handling to minimize memory allocations, and perhaps even re-use of compression structure instances. So the revised API has the ability to allow for that to be optimized in the future.

karrick avatar Aug 19 '20 14:08 karrick

Any progress on this? I'd love to get zstd in this upstream tree.

(and a way to set zstd.BestCompression)

ThomasHabets avatar May 19 '22 18:05 ThomasHabets