K4os.Compression.LZ4 icon indicating copy to clipboard operation
K4os.Compression.LZ4 copied to clipboard

[Feature] Optimized LZ4 Frame aware encoding/decoding to stream

Open mirasrael opened this issue 2 years ago • 4 comments

In out project we used LZ4Stream.Encode/Decode very intensively for network packets compression and for frozen data compression (to save memory while it isn't in active usage). But then during profiling we realized that way we use it makes lot of allocations of temporary memory for TryStashFrame and ReadFrame functions and so makes high pressure on GC. We're using really simple scenario when we just need to encode/decode whole content so don't need to have a Stream object, but just something like CopyTo output stream. For decompression in most cases we even have original size so can pre-allocate memory for output stream for optimization.

To effectively use memory and pooled buffers we had to re-implement parts of library and impleented following utils class (there used some internal functions for binary streams and spans, but they're pretty self-explaining). It would be cool if you add something like that into the core (memory-allocation optimized copy-to-stream solution).

    using System;
    using System.Buffers;
    using System.IO;
    using Eco.Shared.Utils;
    using Eco.Shared.Utils.Binary;
    using K4os.Compression.LZ4;
    using K4os.Compression.LZ4.Encoders;
    using K4os.Compression.LZ4.Internal;
    using K4os.Compression.LZ4.Streams;
    using K4os.Hash.xxHash;

    public class LZ4Utils
    {
        const uint Lz4FrameMagicNumber = 407708164U;

        private static readonly LZ4Descriptor EncodingDescriptor = new(null, false, true, false, null, Mem.K64);

        /// <summary>Encodes <paramref name="data"/> to <paramref name="compressed"/> stream.</summary>
        public static void StreamEncode(ReadOnlySpan<byte> data, MemoryStream compressed)
        {
            if (data.Length == 0) return;

            var pool       = ArrayPool<byte>.Shared;
            var descriptor = EncodingDescriptor;
            var bufferSize = LZ4Codec.MaximumOutputSize(descriptor.BlockSize);
            var buffer     = pool.Rent(bufferSize);
            var level      = default(LZ4Level);

            WriteFrame(compressed, descriptor);                                                                    // write frame containing header and magic number

            var           encoder = LZ4Encoder.Create(descriptor.Chaining, level, descriptor.BlockSize);           // create LZ4 encoder

            int           encoded;
            EncoderAction action;
            do
            {
                action = encoder.TopupAndEncode(data, buffer, false, true, out var loaded, out encoded);           // try to encode as much as possible, actual values returned in loaded (how many bytes loaded from data) and encoded (how many bytes written to buffer)
                WriteBlock(compressed, buffer, encoded, action);
                data = data.Slice(loaded);                                                                         // shift data by loaded content
            } while (data.Length > 0);

            action = encoder.FlushAndEncode(buffer, true, out encoded);
            WriteBlock(compressed, buffer, encoded, action);
            WriteEmptyBlock(compressed);

            pool.Return(buffer);
        }

        /// <summary>Decodes content of <paramref name="compressed"/> span with LZ4 and outputs decompressed result to <paramref name="decompressed"/>.</summary>
        public static void StreamDecode(ReadOnlySpan<byte> compressed, MemoryStream decompressed)
        {
            var       reader     = new ByteSpanReader(compressed);
            var       descriptor = ReadFrame(ref reader);
            using var decoder    = LZ4Decoder.Create(descriptor.Chaining, descriptor.BlockSize);
            int       uncompressedLength;
            while ((uncompressedLength = ReadBlock(ref reader, descriptor, decoder)) > 0)
                decoder.Drain(decompressed.WriteSpan(uncompressedLength), -uncompressedLength, uncompressedLength);
        }

        /// <summary>
        /// Ported from https://github.com/MiloszKrajewski/K4os.Compression.LZ4.
        /// Reads LZ4 frame header and returns descriptor.
        /// </summary>
        static LZ4Descriptor ReadFrame(ref ByteSpanReader reader)
        {
            var mayBeMagicNumber = reader.ReadUInt32();
            if (mayBeMagicNumber != Lz4FrameMagicNumber)
                throw new InvalidDataException("LZ4 frame magic number expected");
            var afterMagicSpan = reader.UnreadSpan;
            var flgBd          = reader.ReadUInt16();
            var flg            = flgBd & 0xFF;
            var bd             = (flgBd >> 8) & 0xFF;
            var version        = (flg >> 6) & 0x11;
            if (version != 1)
                throw new InvalidDataException($"LZ4 frame version {version} is not supported");
            var blockChaining   = ((flg >> 5) & 0x01) == 0;
            var blockChecksum   = ((flg >> 4) & 0x01) != 0;
            var hasContentSize  = ((flg >> 3) & 0x01) != 0;
            var contentChecksum = ((flg >> 2) & 0x01) != 0;
            var hasDictionary   = (flg & 0x01) != 0;
            var blockSizeCode   = (bd >> 4) & 0x07;
            var contentLength   = hasContentSize ? (long?) reader.ReadInt64() : null;
            var dictionaryId    = hasDictionary ? (uint?) reader.ReadUInt32() : null;
            var headerSpan      = afterMagicSpan.Slice(0, afterMagicSpan.Length - reader.BytesLeft);

            var actualHC   = (byte)(XXH32.DigestOf(headerSpan) >> 8);
            var expectedHC = reader.ReadByte();

            if (actualHC != expectedHC)
                throw new InvalidDataException("Invalid LZ4 frame header checksum");
            var blockSize = MaxBlockSize(blockSizeCode);
            if (hasDictionary)
                throw new NotImplementedException("Predefined dictionaries feature is not implemented");
            return new LZ4Descriptor(contentLength, contentChecksum, blockChaining, blockChecksum, dictionaryId, blockSize);
        }

        /// <summary>
        /// Ported from https://github.com/MiloszKrajewski/K4os.Compression.LZ4.
        /// Reads next LZ4 block using <paramref name="descriptor"/> and <paramref name="decoder"/>.
        /// Block content may be then drained from <paramref name="decoder"/>. Returns number of uncompressed bytes to be drained.
        /// </summary>
        static unsafe int ReadBlock(ref ByteSpanReader reader, LZ4Descriptor descriptor, ILZ4Decoder decoder)
        {
            var blockLength = reader.ReadInt32();
            if (blockLength == 0)
            {
        		if (descriptor.ContentChecksum)
        			_ = reader.ReadInt32();
                        return 0;
            }

            var uncompressed = (blockLength & 0x80000000) != 0;
            blockLength &= 0x7FFFFFFF;
            fixed (byte* bytes = reader.UnreadSpan)
            {
                reader.Skip(blockLength); // this line ensures enough bytes to read to avoid access to out of bounds memory
                if (descriptor.BlockChecksum)
                    _ = reader.ReadInt32();
                return uncompressed ? decoder.Inject(bytes, blockLength) : decoder.Decode(bytes, blockLength);
            }
        }

        /// <summary>
        /// Ported from https://github.com/MiloszKrajewski/K4os.Compression.LZ4.
        /// Writes LZ4 frame header for provided <paramref name="descriptor"/>.
        /// </summary>
        static void WriteFrame(MemoryStream stream, LZ4Descriptor descriptor)
        {
            const byte version = 1;

            stream.WriteNoZigZag(Lz4FrameMagicNumber);
            var headerStart = (int)stream.Position;
            var chaining = descriptor.Chaining;
            var blockChecksum = descriptor.BlockChecksum;
            var contentChecksum = descriptor.ContentChecksum;
            var hasContentLength = descriptor.ContentLength.HasValue;
            var hasDictionary = descriptor.Dictionary.HasValue;
            var flg = (version << 6) | (chaining ? 0 : 1) << 5 | (blockChecksum ? 1 : 0) << 4 | (hasContentLength ? 1 : 0) << 3 | (contentChecksum ? 1 : 0) << 2 | (hasDictionary ? 1 : 0);
            var blockSize = descriptor.BlockSize;
            var bd = MaxBlockSizeCode(blockSize) << 4;
            stream.Write((ushort) (flg & byte.MaxValue | (bd & byte.MaxValue) << 8));
            if (hasContentLength)
                throw new NotImplementedException("ContentSize feature is not implemented");
            if (hasDictionary)
                throw new NotImplementedException("Predefined dictionaries feature is not implemented");

            var headerSpan = stream.GetBuffer().AsSpan(headerStart, (int)stream.Position - headerStart);
            var digest     = (byte)(XXH32.DigestOf(headerSpan) >> 8);
            stream.WriteByte(digest);
        }

        /// <summary>Writes empty block (usually used to last block marker).</summary>
        static void WriteEmptyBlock(MemoryStream stream) => stream.WriteNoZigZag(0U);

        /// <summary>
        /// Ported from https://github.com/MiloszKrajewski/K4os.Compression.LZ4.
        /// Writes LZ4 block (either compressed or uncompressed).
        /// </summary>
        static void WriteBlock(MemoryStream stream, byte[] buffer, int length, EncoderAction action)
        {
            if (length == 0) return;                                                                     // don't write empty blocks, should be done explicitly with WriteEmptyBlock
            stream.WriteNoZigZag((uint)(action == EncoderAction.Copied ? 0x80000000 | length : length)); // set top bit to 1 if data wasn't compressed
            stream.Write(buffer, 0, length);                                                             // write compressed content
        }

        /// <summary> Ported from https://github.com/MiloszKrajewski/K4os.Compression.LZ4. Converts block size to block size code.</summary>
        static int MaxBlockSizeCode(int blockSize) =>
            blockSize switch {
                <= Mem.K64 => 4, <= Mem.K256 => 5, <= Mem.M1 => 6, <= Mem.M4 => 7, _ => throw new ArgumentException($"Invalid block size {blockSize} (should be less or equal than 4Mb)")
            };

        /// <summary> Ported from https://github.com/MiloszKrajewski/K4os.Compression.LZ4. Converts block size code to actual block size.</summary>
        static int MaxBlockSize(int blockSizeCode) =>
            blockSizeCode switch {
                7 => Mem.M4, 6 => Mem.M1, 5 => Mem.K256, 4 => Mem.K64, _ => Mem.K64
            };
    }

mirasrael avatar Mar 30 '22 06:03 mirasrael

That's exactly what I'm working on right now (see branch abstracted-streams). I'm trying to abstract stream into something lighten (let's call it ByteProducer / ByteConsumer), so stream would also be one of implementations but not the only implementation.

Unfortunately still some allocations happen because of async implementations (async methods not being true methods, but state machine classes), therefore I'm still not happy with new implementation. Async methods also don't handle ref structs or mutable structs (which are great for fantastic high performance / low GC solutions) - actually they silently crash and burn (well, original structs are not mutated, because structs are cloned).

Your implementation will be faster for now (actually, maybe for quite a long time), because for now I'm focusing on generic approach, and as your ByteSpanReader is kind-of ByteProducer it is the ref part which cannot be easily made generic (it would also not work with async methods).

Anyway, thank you. It gave me something to think about. I am addressing the same problem, but I'm still trying more generic approach (which may also mean: a little bit slower).

MiloszKrajewski avatar Mar 30 '22 07:03 MiloszKrajewski

@mirasrael NOTE, LZ4Pickler is much lighter protocol (support for Span<byte>, and IBufferWriter<byte>, very low GC use, uses stackalloc below 1024 bytes, etc). It is NOT LZ4 Frame compatible though (it is just proprietary format). Worth considering for small (to medium) objects which fit in memory (as it does not support streaming).

MiloszKrajewski avatar Mar 30 '22 08:03 MiloszKrajewski

@MiloszKrajewski glad to know you're aware about that problem and working on more effecient solution 👍 Also thanks for advice with LZ4Pickler - we actually using older version which doesn't have API with IBufferWriter so I considered it as still not good enough because of byte array allocated for result, but with IBufferWriter it may be avoided. Also I thought that if you can make generic version of LZ4Pickler.Pickle<TBufferWriter>() where TBufferWriter : IBufferWriter then it will be possible to make lightweght struct wrappers (without boxing) for streams, pooled byte arrays etc and also compiler may more aggressively optimize interface calls.

mirasrael avatar Mar 30 '22 09:03 mirasrael

Yup, <TBufferWriter> where TBufferWriter : struct, IBufferWriter makes sense to wrap some pooled dynamic byte array. I will definitely add it to next release.

MiloszKrajewski avatar Mar 30 '22 14:03 MiloszKrajewski

Lot of new functionality around streams, ReadOnlySpans, Memory and BufferWriters in 1.3.0-beta

MiloszKrajewski avatar Nov 04 '22 22:11 MiloszKrajewski

This snippet does exactly what has been implemented in new version. I do agree, it is much more optimized (as it has less layers of abstraction), but also duplicates a lot of code. I will think about rewiring some parts on of my solution so allow this "no cost abstraction" (like state machine allowing to decouple engine from async operations).

MiloszKrajewski avatar Nov 06 '22 21:11 MiloszKrajewski