pekko icon indicating copy to clipboard operation
pekko copied to clipboard

Add zstd compression and decompression for pekko streams

Open mdedetrich opened this issue 1 month ago • 26 comments

Resolves: https://github.com/apache/pekko/issues/2404

This PR adds a zstd compression/decompression stream/flow using zstd-jni, this project was chosen because its the only one that has high performance (it uses the reference zstd implementation via JNI) and also supports at least JDK 17 (its published with JDK 11).

The PR still needs to be completed (documentation needs to be added along with MiMa exclusions) but I am creating a PR now with the necessary barebones so that people can comment on whether the PR is on the right track, tests have been added (there is already a base testing framework for pekko-streams compression flows CoderSpec).

The implementation of ZstdCompressor/ZstdDecompressor uses ZstdDirectBufferCompressingStreamNoFinalizer/ZstdDirectBufferDecompressingStreamNoFinalizer as these are the abstractions provided by zstd-jni to do streaming compression, note that the versions with NoFinalizer just mean that you need to explicitly shutdown the resource (which is what we want since Pekko Streams handles resource cleaning). These compression abstractions need to use a direct ByteBuffer in order to handle the shuffling of data between the JNI boundary so that the C implementation can do its work directly in memory, the zstd-jni tests was the basis used to write the implementation.

Some extra notes

  • CoderSpec had to be modified as the test which catches the exception to be thrown on corrupt input was hardcoded to DataFormatException where as zstd throws its own bespoke exception on corrupt input
  • The ZstdCompressor implements the Compressor abstraction which does a lot of heavy lifting (especially when it comes to tests) however the ZstdDecompressor intentionally does not implement DeflateDecompressorBase as the design is heavily tied to Java's deflate/compression API's, instead we use SimpleLinearGraphStage[ByteString] backed by ZstdDirectBufferDecompressingStreamNoFinalizer
  • The current API also allows you to specify a dictionary when doing compression. Note that to do this, you need to pass a com.github.luben.zstd.ZstdDictCompress datastructure which is tied to the implementation of zstd-jni. There is an argument to create our own pekko equivalent of ZstdDictCompress which will internally map to a com.github.luben.zstd.ZstdDictCompress, doing so would allow us to swap to a different implementation of zstd in the future without breaking the API.
    • This is the only part of the API (aside from the com.github.luben.zstd.ZstdIOException exception that is thrown on corrupt input) that is tied to the zstd-jni implementation

mdedetrich avatar Oct 28 '25 12:10 mdedetrich

Since the core tests have passed (the github action says failed but thats due to a missing paradox documentation) I have added other reviewers

mdedetrich avatar Oct 28 '25 13:10 mdedetrich

@He-Pin This cannot be backported as the zstd-jni library is built against JDK 11 and Pekko 1.x series requires every dependency to be built against JDK 1.8 or lower

mdedetrich avatar Oct 29 '25 07:10 mdedetrich

@mdedetrich Not sure if it's better to be a dedicated module, I will review it this weekend.

He-Pin avatar Oct 29 '25 08:10 He-Pin

I have finished the docs for paradox (hence completing the PR) and also fixed some minor things, all resolved comments from the copilot review (including ones which were outright wrong).

There is still an open question about whether we should make our own com.github.luben.zstd.ZstdDictCompress datastructure so that the API is abstracted away from the implementation.

mdedetrich avatar Oct 29 '25 08:10 mdedetrich

@mdedetrich Not sure if it's better to be a dedicated module, I will review it this weekend.

There is a discussion on this topic at https://github.com/apache/pekko/pull/2409#discussion_r2469468261, I personally think making this an extra module adds a lot of ceremony (as we would also have to add it another module in pekko-http) for no real benefit to end user but if a lot of people feel strongly about it I can reconsider.

mdedetrich avatar Oct 29 '25 08:10 mdedetrich

I have just added another commit where we use the ByteBufferCleaner to clean up the direct byte buffers rather than resorting to GC (see my comment). There is however an open question about whether we should do

if (ByteBufferCleaner.isSupported)
  ByteBufferCleaner.clean(inputBB)

on the compress method in ZstdCompressor as this will trigger a cleanup on every onPush of the stream. I don't know how expensive this cleanup is, there can be an argument that for this specific case we should defer it for GC as GC cleaning up many ByteBuffers as a batch would be faster, @jrudolph can you comment?

mdedetrich avatar Oct 29 '25 12:10 mdedetrich

on the compress method in ZstdCompressor as this will trigger a cleanup on every onPush of the stream. I don't know how expensive this cleanup is, there can be an argument that for this specific case we should defer it for GC as GC cleaning up many ByteBuffers as a batch would be faster, @jrudolph can you comment?

There used to be a reason to use buffer pools especially for direct buffers, probably not least because you might fragment the native heap. Would probably make sense to not churn through them in quick succession. Why not use the buffer pool from the zstd-jni jar?

jrudolph avatar Oct 29 '25 12:10 jrudolph

on the compress method in ZstdCompressor as this will trigger a cleanup on every onPush of the stream. I don't know how expensive this cleanup is, there can be an argument that for this specific case we should defer it for GC as GC cleaning up many ByteBuffers as a batch would be faster, @jrudolph can you comment?

There used to be a reason to use buffer pools especially for direct buffers, probably not least because you might fragment the native heap. Would probably make sense to not churn through them in quick succession. Why not use the buffer pool from the zstd-jni jar?

Ah I just noticed the buffer pool in zstd-jni, it wasn't being used in the sample code with ZstdDirectBufferCompressingStream/ZstdDirectBufferDecompressingStream, ill look into it, thanks!

mdedetrich avatar Oct 29 '25 12:10 mdedetrich

on the compress method in ZstdCompressor as this will trigger a cleanup on every onPush of the stream. I don't know how expensive this cleanup is, there can be an argument that for this specific case we should defer it for GC as GC cleaning up many ByteBuffers as a batch would be faster, @jrudolph can you comment?

There used to be a reason to use buffer pools especially for direct buffers, probably not least because you might fragment the native heap. Would probably make sense to not churn through them in quick succession. Why not use the buffer pool from the zstd-jni jar?

So I just tried doing this and it doesn't work with ZstdDirectBufferCompressingStreamNoFinalizer/ZstdDirectBufferDecompressingStreamNoFinalizer as they expect direct byte buffers. zstd-jni's RecyclingBufferPool doesn't return direct byte buffers, instead its used internally with ZstdInputStreamNoFinalizer/ZstOutputStreamNoFinalizer which leaves me with 2 options, either

  1. Use Pekko's ByteBufferPool and rewrite the ZstdCompressor logic to deal with fixed size direct ByteBuffer's (shouldn't be too bad)

or

  1. Use zstd-jni's ZstdInputStreamNoFinalizer/ZstOutputStreamNoFinalizer, which would likely be simpler but also would be less performant

mdedetrich avatar Oct 29 '25 13:10 mdedetrich

So I have been thinking about this, and I am coming to the conclusion that I should just reimplement the zstd compressor from scratch using the low level zstd-jni API (i.e. public int compress(byte[] dst, byte[] src) within the ZstdCompressCtx class. The reasoning is as follows

  • ZstdDirectBufferCompressingStreamNoFinalizer and related classes were designed to work with sources that provide direct ByteBuffer's i.e. Netty/Java NIO however in our case we are dealing with ByteString where we don't have direct ByteBuffer's (ByteString.asByteBuffer is not a direct byte buffer) so we always need up having to copy into an array that then gets pushed into a direct ByteBuffer before going into the compress function. Using ZstdCompressCtx we can just directly feed/get the byte arrays.
  • Now lets get onto buffering
    • When it comes to compression, whether we want to buffer or not actually depends on the size of the ByteString element thats incoming onto the stream.
    • If the element happens to be really "big", its actually ideal to compress that element as is without feeding it into any buffer as the larger the element, the better the compression ratio is. In this case we can just use input.toByteArrayUnsafe and feed it directly into ZstdCompressCtx.compress, even better we don't need to worry about any copying in happy path case for small ByteString.
    • If the input ByteStrings is "small", then we can keep on asking for elements to buffer and once it hits a constant size limit we can then feed it into ZstdCompressCtx.compress
  • When it comes to decompression, the logic can reman as is. We don't need to use a DirectByteBufferPool because we only create 2 buffers per stream instantiation (not per element of stream) and those ByteBuffer's are reused for each incoming element, albeit it should be updated so that those ByteBuffer's get cleaned up with if (ByteBufferCleaner.isSupported) ByteBufferCleaner.clean(byteBuffer). Using ZstdDecompressCtx isn't going to gain us much net benefit as we need to use the native streaming capabilities of zstd-jni to accept incoming data until we can get to a point where we can decompress.

What are peoples thoughts here, is my thinking of the right track here?

mdedetrich avatar Oct 29 '25 17:10 mdedetrich

  • avoiding unnecessary intermediate transformation to InputStreams/ByteBuffers seems good
  • worth prototyping anyway

pjfanning avatar Oct 29 '25 17:10 pjfanning

So I have been thinking about this, and I am coming to the conclusion that I should just reimplement the zstd compressor from scratch using the low level zstd-jni API (i.e. public int compress(byte[] dst, byte[] src) within the ZstdCompressCtx class. The reasoning is as follows

  • ZstdDirectBufferCompressingStreamNoFinalizer and related classes were designed to work with sources that provide direct ByteBuffer's i.e. Netty/Java NIO however in our case we are dealing with ByteString where we don't have direct ByteBuffer's (ByteString.asByteBuffer is not a direct byte buffer) so we always need up having to copy into an array that then gets pushed into a direct ByteBuffer before going into the compress function. Using ZstdCompressCtx we can just directly feed/get the byte arrays.

This seems like a convincing argument

  • Now lets get onto buffering

    • When it comes to compression, whether we want to buffer or not actually depends on the size of the ByteString element thats incoming onto the stream.
    • If the element happens to be really "big", its actually ideal to compress that element as is without feeding it into any buffer as the larger the element, the better the compression ratio is. In this case we can just use input.toByteArrayUnsafe and feed it directly into ZstdCompressCtx.compress, even better we don't need to worry about any copying in happy path case for small ByteString.

This sounds like the resulting bytes will be different depending on how the input was split, which could be nondeterministic (e.g. depending on network conditions). I guess it's fine since this will most likely be used for 'ephemeral' streams, we can 'blame' the nondeterminism on the component that does the splitting, and someone who cares about determinism could insert a component to convert the input into fixed-size chunks.

  • If the input ByteStrings is "small", then we can keep on asking for elements to buffer and once it hits a constant size limit we can then feed it into ZstdCompressCtx.compress

This is a trade-off between latency and compression performance: in a naive implementation, if we for instance 'keep asking elements to buffer' on the boundary of 2 elements of a gRPC stream, it would delay sending+processing the first element while waiting for the next element to show up - which could take a long time on 'event streams', for example. It's possible to avoid this problem: we could 'keep asking for elements to buffer' only when there's currently no demand from downstream, but compress and send 'everything we have' as soon as demand is signaled. That might be significantly more complex, though.

  • When it comes to decompression, the logic can remain as is. We don't need to use a DirectByteBufferPool because we only create 2 buffers per stream instantiation (not per element of stream) and those ByteBuffer's are reused for each incoming element, albeit it should be updated so that those ByteBuffer's get cleaned up with if (ByteBufferCleaner.isSupported) ByteBufferCleaner.clean(byteBuffer). Using ZstdDecompressCtx isn't going to gain us much net benefit as we need to use the native streaming capabilities of zstd-jni to accept incoming data until we can get to a point where we can decompress.

Sounds good to me.

raboof avatar Oct 30 '25 10:10 raboof

This sounds like the resulting bytes will be different depending on how the input was split, which could be nondeterministic (e.g. depending on network conditions). I guess it's fine since this will most likely be used for 'ephemeral' streams, we can 'blame' the nondeterminism on the component that does the splitting, and someone who cares about determinism could insert a component to convert the input into fixed-size chunks.

Yes this is precisely my thinking as well. Hypothetically speaking someone could just feed 1gbyte sized chunks as individual stream elements but if you are doing that then your already in a troublesome spot, regardless of compression or not.

The more I think about it, this limit should be configurable (and maybe even optional) with sensible defaults, wdyt?

This is a trade-off between latency and compression performance: in a naive implementation, if we for instance 'keep asking elements to buffer' on the boundary of 2 elements of a gRPC stream, it would delay sending+processing the first element while waiting for the next element to show up - which could take a long time on 'event streams', for example. It's possible to avoid this problem: we could 'keep asking for elements to buffer' only when there's currently no demand from downstream, but compress and send 'everything we have' as soon as demand is signaled. That might be significantly more complex, though.

Good point, I did miss that. I am currently doing an initial implementation that follows the logic I described, but it can be improved on later to bypass the accumulation of data if downstream is demanding data right now. I assume that you would use hasBeenPulled(port) to figure out if there is downstream demand?

mdedetrich avatar Oct 30 '25 13:10 mdedetrich

This is a trade-off between latency and compression performance: in a naive implementation, if we for instance 'keep asking elements to buffer' on the boundary of 2 elements of a gRPC stream, it would delay sending+processing the first element while waiting for the next element to show up - which could take a long time on 'event streams', for example. It's possible to avoid this problem: we could 'keep asking for elements to buffer' only when there's currently no demand from downstream, but compress and send 'everything we have' as soon as demand is signaled. That might be significantly more complex, though.

Good point, I did miss that. I am currently doing an initial implementation that follows the logic I described, but it can be improved on later to bypass the accumulation of data if downstream is demanding data right now. I assume that you would use hasBeenPulled(port) to figure out if there is downstream demand?

From https://pekko.apache.org/docs/pekko/current/stream/stream-customize.html it looks like hasBeenPulled is for input ports, and isAvailable would be the thing to check. You'd also need to make sure that your flow signals demand even when downstream doesn't (until the buffer is 'full').

raboof avatar Oct 30 '25 14:10 raboof

ZstdDirectBufferCompressingStreamNoFinalizer and related classes were designed to work with sources that provide direct ByteBuffer's i.e. Netty/Java NIO however in our case we are dealing with ByteString where we don't have direct ByteBuffer's (ByteString.asByteBuffer is not a direct byte buffer) so we always need up having to copy into an array that then gets pushed into a direct ByteBuffer before going into the compress function. Using ZstdCompressCtx we can just directly feed/get the byte arrays.

You can ignore the copy cost, compression is not cheap anyway. Also, the copying is likely happening anyways as you otherwise have to globally lock the heap array and interfere with GC...

Now lets get onto buffering

It's a complex topic how and when a (streaming) compressor should outputs compressed data. Roughly speaking, for best compression, you leave the decision completely to the compressor. For more interactive usage you want to send any data as early as possible (kind of a nagle situation but for compression). The compressor cannot know what you want and we cannot know what the user wants. Also note, that streaming does not equal interactive usage, you might want to stream data to run with less memory, so for the (native) compressor API you don't want to make the size of the chunks part of the decision of when to flush.

For our streaming API, the problem is that we cannot easily provide the signal when to flush using a raw ByteString based stream. For that reason, in the deflate compressor we allow to set a flush mode and flush after each ByteString. That way the user still has full control over when the flushing does happen (by buffering data in the stream if required). We should not add any inner logic on top of that. Let's not overthink the interface between our compression component and the native component. In general, if we can provide all of incoming data in one go to the compressor that's ok because it's already on the heap anyway and owned by us. But we should never hold anything back.

When it comes to decompression, the logic can reman as is. We don't need to use a DirectByteBufferPool because we only create 2 buffers per stream instantiation (not per element of stream) and those ByteBuffer's are reused for each incoming element, albeit it should be updated so that those ByteBuffer's get cleaned up with if (ByteBufferCleaner.isSupported) ByteBufferCleaner.clean(byteBuffer). Using ZstdDecompressCtx isn't going to gain us much net benefit as we need to use the native streaming capabilities of zstd-jni to accept incoming data until we can get to a point where we can decompress.

In any case, we should avoid cycling through direct buffers. E.g. using one for the lifetime of the whole stream could be acceptable.

Btw. zstd is somewhat different from deflate in that regard, you can only ever decompress full blocks while deflate can at least in theory start streaming while still receiving a block (see https://github.com/facebook/zstd/issues/1359 and https://fastcompression.blogspot.com/2016/04/working-with-streaming.html, the tldr; is that a compressed block is max 128kb anyway and that usually dwarfs any LZ77 context buffers you might have to keep around anyways).

jrudolph avatar Oct 30 '25 14:10 jrudolph

You can ignore the copy cost, compression is not cheap anyway. Also, the copying is likely happening anyways as you otherwise have to globally lock the heap array and interfere with GC...

Yes definitely true, my main point here is that even though the copying is already happening behind the scenes and in context of compression the copying is cheap, doing this

ByteString.fromArrayUnsafe(zstdCompressCtx.compress(input.toArrayUnsafe()))

is far simpler and easier to maintain than

val inputBB = ByteBuffer.allocateDirect(input.size)
inputBB.put(input.toArray)
inputBB.flip()
compressingStream.compress(inputBB)
compressingStream.flush()
targetBuffer.flip()

val arr = new Array[Byte](targetBuffer.limit())
targetBuffer.get(arr)
targetBuffer.clear()
ByteString.fromArrayUnsafe(arr)

And that version isn't even the ideal one since we are allocating a direct ByteBuffer on every input element (which we shouldn't be doing since direct ByteBuffer allocation is extremely expensive). Rather if we want to work with direct ByteBuffer, we should get a ByteBuffer from a DirectByteBufferPool which will complicate the logic even further since DirectByteBufferPool only supports direct ByteBuffer's of a static size (and in our case we have a dynamic input.size length)

It's a complex topic how and when a (streaming) compressor should outputs compressed data. Roughly speaking, for best compression, you leave the decision completely to the compressor. For more interactive usage you want to send any data as early as possible (kind of a nagle situation but for compression). The compressor cannot know what you want and we cannot know what the user wants. Also note, that streaming does not equal interactive usage, you might want to stream data to run with less memory, so for the (native) compressor API you don't want to make the size of the chunks part of the decision of when to flush.

So my mentality behind is https://github.com/apache/pekko/pull/2409#issuecomment-3462846239 is that with all else being equal, its always better to compress larger chunks of data than small. Now the critical part here is all else being equal, what @raboof said earlier is entirely correct in that even if we have stream elements as small as 1 byte (or even less), for some kind of applications you want to compress and send as fast as possible due to latency reasons.

For our streaming API, the problem is that we cannot easily provide the signal when to flush using a raw ByteString based stream. For that reason, in the deflate compressor we allow to set a flush mode and flush after each ByteString.

Unless I am missing something I don't think the flush mode is configurable as of now, in fact you even left a comment and an issue saying it should be configurable (see https://github.com/apache/pekko/blob/085bb4275a70d8db4c2c53946712ee6b54e74688/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Compression.scala#L29-L34)

Also on the same point, are you saying that @raboof 's suggestion i.e.

It's possible to avoid this problem: we could 'keep asking for elements to buffer' only when there's currently no demand from downstream, but compress and send 'everything we have' as soon as demand is signaled. That might be significantly more complex, though.

Is not possible or is it just overcomplicated/not necessary? To me, with the assumption that we can reliably figure out if downstream is demanding data this should be able to get the best of both worlds, no? i.e. if downstream is asking for data, we just compress the input element as is and send it downstream but if downstream is not asking for data then we can buffer elements (up to a sane limit) and we send the compressed data as soon as that limit is hit and/or downstream asks for demand again.

Also if this suggested logic ends up being implemented, it would be configurable so users can disable it which would just compress each element as it is (and this wouldn't just be for latency reasons, as you pointed out if a user wants to reduce memory/CPU pressure as well).

That way the user still has full control over when the flushing does happen (by buffering data in the stream if required). We should not add any inner logic on top of that. Let's not overthink the interface between our compression component and the native component. In general, if we can provide all of incoming data in one go to the compressor that's ok because it's already on the heap anyway and owned by us. But we should never hold anything back.

If I am understanding what you are saying, then essentially you are suggesting that the implementation of the compressor would be a trivial SimpleLinearGraphStage[ByteString] that on each incoming processing element just does ByteString.fromArrayUnsafe(zstdCompressCtx.compress(input.toArrayUnsafe())) and we then call it a day?

Don't have to worry about flushing at all, or buffering or anything like that and if a user wants buffering then they can just use the standard streaming operators to implement it on their own.

Btw. zstd is somewhat different from deflate in that regard, you can only ever decompress full blocks while deflate can at least in theory start streaming while still receiving a block (see facebook/zstd#1359 and https://fastcompression.blogspot.com/2016/04/working-with-streaming.html, the tldr; is that a compressed block is max 128kb anyway and that usually dwarfs any LZ77 context buffers you might have to keep around anyways).

Thanks for reminding, Zstd actually provides a constant called Zstd.blockSizeMax which I believe points to the 128kb that you are talking about and that should be the default value for maxBytesPerChunk.

mdedetrich avatar Oct 30 '25 16:10 mdedetrich

I have just updated the PR with two small minor changes, one is setting the default of maxBytesPerChunk for the zstd decompressor to be Zstd.blockSizeMax() which happens to be 128kb (thanks @jrudolph !) and the other is cleaning up the direct ByteBuffer in the postStop hook.

I am holding on my current work of buffering data up to a limit until I get feedback from my latest comment at https://github.com/apache/pekko/pull/2409#issuecomment-3469014680 as it sounds like I might be over-engineering the problem.

mdedetrich avatar Oct 30 '25 17:10 mdedetrich

@mdedetrich, in Netty, we have ZstdOptions too. I think we can take a look at Netty's one for some inspiration before we merge this.

I think we should have this in 1.3.0 release.

He-Pin avatar Nov 07 '25 03:11 He-Pin

@mdedetrich, in Netty, we have ZstdOptions too. I think we can take a look at Netty's one for some inspiration before we merge this.

So I have been waiting for a response to https://github.com/apache/pekko/pull/2409#issuecomment-3469014680 but haven't gotten one yet.

In thinking about it, I am increasingly getting to the view of implementing the more complicated but smart solution, i.e. buffering data up until upper limit or if downstream signals demand. Reason being I think this is a fairly novel approach that takes advantage of pekko-streams bi-directional nature and it can even makes good advertisement for pekko streams (can make a blog post about it)

I think we should have this in 1.3.0 release.

This would have to be a separate artifact and that would open a whole can of worms, not sure I could even make it for 1.3.0

mdedetrich avatar Nov 07 '25 07:11 mdedetrich

zstd is indeed quite complex. In our work, it actually includes a complete platform and toolchain for dictionary training, end-to-end deployment, version synchronization, etc.

Therefore, could we consider releasing a separate zstd toolkit, such as pekko-zstd or pekko-http-zstd, etc.? This would allow for a more complete design and implementation, and also enable independent evolution.

Without a dictionary, zstd is simply a slightly faster gzip implementation.

He-Pin avatar Nov 08 '25 12:11 He-Pin

zstd is indeed quite complex. In our work, it actually includes a complete platform and toolchain for dictionary training, end-to-end deployment, version synchronization, etc.

Actually the zstd-api is incredibly simple, especially if we decide to the simple solution of doing ByteString.fromArrayUnsafe(zstdCompressCtx.compress(input.toArrayUnsafe())). In this case the compressor would just be a few lines of code and its far simpler than what is done in gzip/deflate which has to deal with arbitrary behavior in the JVM implementation.

The complexity that I am describing comes from doing something very smart that is only possible with pekko-streams, i.e. taking into account downstream demand in a stream when it comes to buffering, afaik no other implementation does this.

On the other hand the complexity you describe is from your specific usecase where you work. Training dictionaries would be entirely out of scope for this, but of course you can specify a dictionary in the same way you can specify a compression level and this is also a single of code.

Therefore, could we consider releasing a separate zstd toolkit, such as pekko-zstd or pekko-http-zstd, etc.? This would allow for a more complete design and implementation, and also enable independent evolution.

Assuming the initial implementation is correct I don't see how this could evolve aside from version bumps in zstd-jni (at least with assumption that dictionary training is out of scope)

Without a dictionary, zstd is simply a slightly faster gzip implementation.

Its not just slightly faster, it has significantly better compression ratios. And slightly is downplaying it a bit, it can be many factors faster (ofc it depends on the level and where the bottleneck is).

mdedetrich avatar Nov 08 '25 16:11 mdedetrich

Our API gateway currently uses zstd, so I know the benefits are significant, both in terms of performance and compression ratio. Although Netty's codec directly depends on zstd-jni, I don't know if Pekko-stream users will find this troublesome; personally, I think it's fine.

By standard standards, I think this PR is already good enough, but I'm unsure how much complex effort you're willing to invest. For example, in our work, some of our APIs have separate configurations, dictionary names, versions, etc., and they negotiate with the gateway during application startup. But here, I think we're probably good enough if you're not considering further complexity and extreme optimization.

He-Pin avatar Nov 08 '25 19:11 He-Pin

Currently doing work on making our own separate ZstdDictionary class so we don't depend on the zstd-jni one but its blocked by https://github.com/apache/pekko/pull/2464

mdedetrich avatar Nov 09 '25 11:11 mdedetrich

I have just updated the PR by adding our own ZstdDictionary datastructure so that the public API of the zstd compress/decompress doesn't depend on any internals from zstd-jni.

Aside from making it possible to swap out zstd-jni in the future for any alternative implementations it also makes the API cleaner for both scala and java users.

@He-Pin @pjfanning @raboof Let me know if the design ZstdDictionary works (I made it idiomatic for both scala and javadsl) and also that its location (i.e. org.apache.pekko.stream.javadsl.compression.ZstdDictionary/org.apache.pekko.stream.scaladsl.compression.ZstdDictionary) is appropriate

mdedetrich avatar Nov 09 '25 12:11 mdedetrich

Although Netty's codec directly depends on zstd-jni, I don't know if Pekko-stream users will find this troublesome; personally, I think it's fine.

Its not just netty, even Apache Commons Compress uses zstd-jni and includes it directly as an artifact (see https://commons.apache.org/proper/commons-compress/ and https://mvnrepository.com/artifact/org.apache.commons/commons-compress/1.28.0).

The only difference is that apache-commons has it as an optional maven dependency. We can do the same here, we would just have to document to end users that they have to add the zstd-jni artifact dependency themselves (this line is also just a single change in code).

mdedetrich avatar Nov 09 '25 12:11 mdedetrich

I have updated the PR, I have made the various zstd constants (i.e. min level, max level, default block size etc etc) be part of the Compression object which makes the Pekko API fully separate from the underlying zstd-jni implementation.

I have also rebased against main to use the latest configurable autoFlush changes add as part of https://github.com/apache/pekko/pull/2467

mdedetrich avatar Nov 11 '25 15:11 mdedetrich

Needs a rebase for the merge conflict and the fix for the #2476 failure

raboof avatar Nov 24 '25 12:11 raboof

I have not fully digested the PR but on first read it looks reasonable. Noted a couple of small things that are potential leftovers/nits.

So I still want to get this into 2.0.0-M1 but I haven't had time with the buffer implementation I was planning to do. For now I am thinking of doing the simple dumb solution so that we at least have a implementation with a stable API which unlocks zstd-compression for http responses in pekko-http and I can always do a better version for a future milestone release of 2.0.0.

mdedetrich avatar Nov 25 '25 15:11 mdedetrich

@mdedetrich I think you can mark it as experimental and then polish it in a later release.

He-Pin avatar Nov 26 '25 04:11 He-Pin

@mdedetrich are you still planning further changes? I see you pushed some changes yesterday but AFAICT there's still some open review comments

raboof avatar Dec 01 '25 10:12 raboof