K4os.Compression.LZ4
K4os.Compression.LZ4 copied to clipboard
[Feature] Optimized LZ4 Frame aware encoding/decoding to stream
In out project we used LZ4Stream.Encode/Decode very intensively for network packets compression and for frozen data compression (to save memory while it isn't in active usage). But then during profiling we realized that way we use it makes lot of allocations of temporary memory for TryStashFrame and ReadFrame functions and so makes high pressure on GC. We're using really simple scenario when we just need to encode/decode whole content so don't need to have a Stream object, but just something like CopyTo output stream. For decompression in most cases we even have original size so can pre-allocate memory for output stream for optimization.
To effectively use memory and pooled buffers we had to re-implement parts of library and impleented following utils class (there used some internal functions for binary streams and spans, but they're pretty self-explaining). It would be cool if you add something like that into the core (memory-allocation optimized copy-to-stream solution).
using System;
using System.Buffers;
using System.IO;
using Eco.Shared.Utils;
using Eco.Shared.Utils.Binary;
using K4os.Compression.LZ4;
using K4os.Compression.LZ4.Encoders;
using K4os.Compression.LZ4.Internal;
using K4os.Compression.LZ4.Streams;
using K4os.Hash.xxHash;
public class LZ4Utils
{
const uint Lz4FrameMagicNumber = 407708164U;
private static readonly LZ4Descriptor EncodingDescriptor = new(null, false, true, false, null, Mem.K64);
/// <summary>Encodes <paramref name="data"/> to <paramref name="compressed"/> stream.</summary>
public static void StreamEncode(ReadOnlySpan<byte> data, MemoryStream compressed)
{
if (data.Length == 0) return;
var pool = ArrayPool<byte>.Shared;
var descriptor = EncodingDescriptor;
var bufferSize = LZ4Codec.MaximumOutputSize(descriptor.BlockSize);
var buffer = pool.Rent(bufferSize);
var level = default(LZ4Level);
WriteFrame(compressed, descriptor); // write frame containing header and magic number
var encoder = LZ4Encoder.Create(descriptor.Chaining, level, descriptor.BlockSize); // create LZ4 encoder
int encoded;
EncoderAction action;
do
{
action = encoder.TopupAndEncode(data, buffer, false, true, out var loaded, out encoded); // try to encode as much as possible, actual values returned in loaded (how many bytes loaded from data) and encoded (how many bytes written to buffer)
WriteBlock(compressed, buffer, encoded, action);
data = data.Slice(loaded); // shift data by loaded content
} while (data.Length > 0);
action = encoder.FlushAndEncode(buffer, true, out encoded);
WriteBlock(compressed, buffer, encoded, action);
WriteEmptyBlock(compressed);
pool.Return(buffer);
}
/// <summary>Decodes content of <paramref name="compressed"/> span with LZ4 and outputs decompressed result to <paramref name="decompressed"/>.</summary>
public static void StreamDecode(ReadOnlySpan<byte> compressed, MemoryStream decompressed)
{
var reader = new ByteSpanReader(compressed);
var descriptor = ReadFrame(ref reader);
using var decoder = LZ4Decoder.Create(descriptor.Chaining, descriptor.BlockSize);
int uncompressedLength;
while ((uncompressedLength = ReadBlock(ref reader, descriptor, decoder)) > 0)
decoder.Drain(decompressed.WriteSpan(uncompressedLength), -uncompressedLength, uncompressedLength);
}
/// <summary>
/// Ported from https://github.com/MiloszKrajewski/K4os.Compression.LZ4.
/// Reads LZ4 frame header and returns descriptor.
/// </summary>
static LZ4Descriptor ReadFrame(ref ByteSpanReader reader)
{
var mayBeMagicNumber = reader.ReadUInt32();
if (mayBeMagicNumber != Lz4FrameMagicNumber)
throw new InvalidDataException("LZ4 frame magic number expected");
var afterMagicSpan = reader.UnreadSpan;
var flgBd = reader.ReadUInt16();
var flg = flgBd & 0xFF;
var bd = (flgBd >> 8) & 0xFF;
var version = (flg >> 6) & 0x11;
if (version != 1)
throw new InvalidDataException($"LZ4 frame version {version} is not supported");
var blockChaining = ((flg >> 5) & 0x01) == 0;
var blockChecksum = ((flg >> 4) & 0x01) != 0;
var hasContentSize = ((flg >> 3) & 0x01) != 0;
var contentChecksum = ((flg >> 2) & 0x01) != 0;
var hasDictionary = (flg & 0x01) != 0;
var blockSizeCode = (bd >> 4) & 0x07;
var contentLength = hasContentSize ? (long?) reader.ReadInt64() : null;
var dictionaryId = hasDictionary ? (uint?) reader.ReadUInt32() : null;
var headerSpan = afterMagicSpan.Slice(0, afterMagicSpan.Length - reader.BytesLeft);
var actualHC = (byte)(XXH32.DigestOf(headerSpan) >> 8);
var expectedHC = reader.ReadByte();
if (actualHC != expectedHC)
throw new InvalidDataException("Invalid LZ4 frame header checksum");
var blockSize = MaxBlockSize(blockSizeCode);
if (hasDictionary)
throw new NotImplementedException("Predefined dictionaries feature is not implemented");
return new LZ4Descriptor(contentLength, contentChecksum, blockChaining, blockChecksum, dictionaryId, blockSize);
}
/// <summary>
/// Ported from https://github.com/MiloszKrajewski/K4os.Compression.LZ4.
/// Reads next LZ4 block using <paramref name="descriptor"/> and <paramref name="decoder"/>.
/// Block content may be then drained from <paramref name="decoder"/>. Returns number of uncompressed bytes to be drained.
/// </summary>
static unsafe int ReadBlock(ref ByteSpanReader reader, LZ4Descriptor descriptor, ILZ4Decoder decoder)
{
var blockLength = reader.ReadInt32();
if (blockLength == 0)
{
if (descriptor.ContentChecksum)
_ = reader.ReadInt32();
return 0;
}
var uncompressed = (blockLength & 0x80000000) != 0;
blockLength &= 0x7FFFFFFF;
fixed (byte* bytes = reader.UnreadSpan)
{
reader.Skip(blockLength); // this line ensures enough bytes to read to avoid access to out of bounds memory
if (descriptor.BlockChecksum)
_ = reader.ReadInt32();
return uncompressed ? decoder.Inject(bytes, blockLength) : decoder.Decode(bytes, blockLength);
}
}
/// <summary>
/// Ported from https://github.com/MiloszKrajewski/K4os.Compression.LZ4.
/// Writes LZ4 frame header for provided <paramref name="descriptor"/>.
/// </summary>
static void WriteFrame(MemoryStream stream, LZ4Descriptor descriptor)
{
const byte version = 1;
stream.WriteNoZigZag(Lz4FrameMagicNumber);
var headerStart = (int)stream.Position;
var chaining = descriptor.Chaining;
var blockChecksum = descriptor.BlockChecksum;
var contentChecksum = descriptor.ContentChecksum;
var hasContentLength = descriptor.ContentLength.HasValue;
var hasDictionary = descriptor.Dictionary.HasValue;
var flg = (version << 6) | (chaining ? 0 : 1) << 5 | (blockChecksum ? 1 : 0) << 4 | (hasContentLength ? 1 : 0) << 3 | (contentChecksum ? 1 : 0) << 2 | (hasDictionary ? 1 : 0);
var blockSize = descriptor.BlockSize;
var bd = MaxBlockSizeCode(blockSize) << 4;
stream.Write((ushort) (flg & byte.MaxValue | (bd & byte.MaxValue) << 8));
if (hasContentLength)
throw new NotImplementedException("ContentSize feature is not implemented");
if (hasDictionary)
throw new NotImplementedException("Predefined dictionaries feature is not implemented");
var headerSpan = stream.GetBuffer().AsSpan(headerStart, (int)stream.Position - headerStart);
var digest = (byte)(XXH32.DigestOf(headerSpan) >> 8);
stream.WriteByte(digest);
}
/// <summary>Writes empty block (usually used to last block marker).</summary>
static void WriteEmptyBlock(MemoryStream stream) => stream.WriteNoZigZag(0U);
/// <summary>
/// Ported from https://github.com/MiloszKrajewski/K4os.Compression.LZ4.
/// Writes LZ4 block (either compressed or uncompressed).
/// </summary>
static void WriteBlock(MemoryStream stream, byte[] buffer, int length, EncoderAction action)
{
if (length == 0) return; // don't write empty blocks, should be done explicitly with WriteEmptyBlock
stream.WriteNoZigZag((uint)(action == EncoderAction.Copied ? 0x80000000 | length : length)); // set top bit to 1 if data wasn't compressed
stream.Write(buffer, 0, length); // write compressed content
}
/// <summary> Ported from https://github.com/MiloszKrajewski/K4os.Compression.LZ4. Converts block size to block size code.</summary>
static int MaxBlockSizeCode(int blockSize) =>
blockSize switch {
<= Mem.K64 => 4, <= Mem.K256 => 5, <= Mem.M1 => 6, <= Mem.M4 => 7, _ => throw new ArgumentException($"Invalid block size {blockSize} (should be less or equal than 4Mb)")
};
/// <summary> Ported from https://github.com/MiloszKrajewski/K4os.Compression.LZ4. Converts block size code to actual block size.</summary>
static int MaxBlockSize(int blockSizeCode) =>
blockSizeCode switch {
7 => Mem.M4, 6 => Mem.M1, 5 => Mem.K256, 4 => Mem.K64, _ => Mem.K64
};
}
That's exactly what I'm working on right now (see branch abstracted-streams
). I'm trying to abstract stream into something lighten (let's call it ByteProducer / ByteConsumer), so stream would also be one of implementations but not the only implementation.
Unfortunately still some allocations happen because of async implementations (async methods not being true methods, but state machine classes), therefore I'm still not happy with new implementation.
Async methods also don't handle ref struct
s or mutable struct
s (which are great for fantastic high performance / low GC solutions) - actually they silently crash and burn (well, original structs are not mutated, because structs are cloned).
Your implementation will be faster for now (actually, maybe for quite a long time), because for now I'm focusing on generic approach, and as your ByteSpanReader is kind-of ByteProducer it is the ref
part which cannot be easily made generic (it would also not work with async methods).
Anyway, thank you. It gave me something to think about. I am addressing the same problem, but I'm still trying more generic approach (which may also mean: a little bit slower).
@mirasrael NOTE, LZ4Pickler
is much lighter protocol (support for Span<byte>
, and IBufferWriter<byte>
, very low GC use, uses stackalloc
below 1024 bytes, etc). It is NOT LZ4 Frame compatible though (it is just proprietary format). Worth considering for small (to medium) objects which fit in memory (as it does not support streaming).
@MiloszKrajewski glad to know you're aware about that problem and working on more effecient solution 👍 Also thanks for advice with LZ4Pickler
- we actually using older version which doesn't have API with IBufferWriter
so I considered it as still not good enough because of byte array allocated for result, but with IBufferWriter
it may be avoided. Also I thought that if you can make generic version of LZ4Pickler.Pickle<TBufferWriter>() where TBufferWriter : IBufferWriter then it will be possible to make lightweght struct wrappers (without boxing) for streams, pooled byte arrays etc and also compiler may more aggressively optimize interface calls.
Yup, <TBufferWriter> where TBufferWriter : struct, IBufferWriter
makes sense to wrap some pooled dynamic byte array.
I will definitely add it to next release.
Lot of new functionality around streams, ReadOnlySpans, Memory and BufferWriters in 1.3.0-beta
This snippet does exactly what has been implemented in new version. I do agree, it is much more optimized (as it has less layers of abstraction), but also duplicates a lot of code. I will think about rewiring some parts on of my solution so allow this "no cost abstraction" (like state machine allowing to decouple engine from async operations).