redpanda
redpanda copied to clipboard
Add an asychronous zstd stream compression class
Cover letter
This PR adds a new Zstd streaming interface async_stream_zstd that is different from the existing stream_zstd in a few ways.
Firstly all compression/decompression methods return futures and allow for seastar to interrupt them if they are close to hitting the reactor stall timeout. This interface change is the main justification for a new class instead of modifying the existing one as a lot of non-futurized code in v/kafka expects compress/uncompress to be a blocking call that returns the actual result right away. Hopefully we'll be able to migrate users of stream_zstd to async_stream_zstd in time.
Secondly in compress currently stream_zstd allocates a large contiguous buffer for the entire expected size for the compressed data block this has shown to cause OOM errors when the data block is too large (see #5566 ). async_stream_zstd instead of allocating a large contiguous buffer for the compression output is outputting a Zstd block at a time (128KiB max size) and then appending that block to an iobuf. This change should probably be ported over to stream_zstd as well in case any kafka API messages prove to be large enough to cause OOM errors.
Fixes #5116, #5566
Release notes
- none
Is the gist to use thread local storage of 2mb as the scratch space. Can the cover letter have more deets.
Is the gist to use thread local storage of 2mb as the scratch space. Can the cover letter have more deets.
Updated the cover letter to explain what's going on in the PR. Sorry for the delay! This new class will allocated about 4mb of scratch space for compression/decompression on each thread at the start of RP. 2mb will be for decompression and 2mb will be for compression. By doing this we can guarantee that Zstd won't internally allocate any more space during any compression/decompression operations.
Results of performance testing vs the existing stream_zstd class are as follows:
test iterations median mad min max allocs tasks inst
streaming_zstd_1mb.compress 6385 130.290us 1.513us 126.861us 137.173us 16.000 0.000 0.0
streaming_zstd_1mb.uncompress 4185 78.486us 2.100us 75.342us 80.586us 108.616 0.000 0.0
streaming_zstd_10mb.compress 653 1.118ms 24.680us 1.041ms 1.142ms 942.830 0.000 0.0
streaming_zstd_10mb.uncompress 429 844.631us 80.656us 760.340us 976.986us 2095.301 0.000 0.0
async_stream_zstd.1mb_compress 7616 100.022us 262.374ns 99.547us 100.598us 148.278 2.082 0.0
async_stream_zstd.1mb_uncompress 4804 73.661us 108.821ns 73.542us 73.819us 312.587 6.156 0.0
async_stream_zstd.10mb_compress 772 979.403us 1.092us 978.266us 985.029us 2176.762 56.239 0.0
async_stream_zstd.10mb_uncompress 490 728.405us 720.304ns 726.690us 731.632us 4089.282 109.756 0.0
Some of the conclusions that can be made are;
async_stream_zstd is a bit faster in compression than stream_zstd .This is probably due to using a statically allocated workspace vs. the dynamically allocated one that stream_zstd uses.
In all tests async_stream_zstd has more allocations than stream_zstd. This is becuase async_stream_zstd allocates lots of small fragments to store the results of compressions and decompressions. This can be mitigated if seen as an issue by having iobuf create larger fragments when appending new data.
Beyond this stream_zstd causes several seastar reactor stalls during the benchmark. While async_stream_zstd does not cause any.
@ballard26 we should make this the default in our internal RPC
/ci-repeat 5
I was just re-reading the cover page and understand now that the static allocation of the workspace is new with this change.
One concern I have is that this may not really reduce the bottleneck, only hide the symptom. Before, a large [de]compression would cause reactor stalls because compression was synchronous. Now, a large compression won't cause reactor stalls, because it can be suspended, but while it is suspended any other task trying to use the compressor (not at all unlikely given how prevalent RPC calls are) will itself have to wait for the workspace to be available. So large [de]compressions will cause "stalls" but not reported as reactor stalls.
One possibility is to allocate at least a "few" workspaces so that more than one [de]compression can be in progress at once. I was discussing this with @ballard26 but it isn't clear what the right number would be. Perhaps it can scale with the shard memory, like 1 + shard_memory / 500M or something.
@dotnwat and @jcsp your thoughts appreciated.
Now, a large compression won't cause reactor stalls, because it can be suspended, but while it is suspended any other task trying to use the compressor (not at all unlikely given how prevalent RPC calls are) will itself have to wait for the workspace to be available
The way I'm reading it (admittedly a little hastily), the code in the PR doesn't actually use the workspace across a scheduling point: by the time we get to a yield point in do_compress, the workspace has already been copied out into the output iobuf. So the mutex isn't strictly necessary to protect the buffer, and we have a choice about whether to allow async compressions to interleave.
If we had many big RPCs, then I would favor not interleaving them, to get best cache locality. But i think the situation we actually have is more like many small RPCs, and occasional big ones (e.g. our monolithic node health reports).
So maybe the behavior should be that any RPC which can complete its work without sleeping just proceeds without the mutex, and the mutex is only taken for "big" RPCs that will have yield points.
The way I'm reading it (admittedly a little hastily), the code in the PR doesn't actually use the workspace across a scheduling point: by the time we get to a yield point in
do_compress, the workspace has already been copied out into the output iobuf.
I believe you need to keep the workspace around for the entire [de]compression operation: the context carries context from one ZSTD_compressStream2 call to the next (both settings and [de]compression state): the output is streaming but not independent: the [de]compression of later chunks depends on what happened in the earlier ones (without some kind of explicitly flush operation). If that's right, you do need exclusive access to the workspace for the duration of the [de]compression. Maybe @ballard26 can confirm one way or the other.
If we had many big RPCs, then I would favor not interleaving them, to get best cache locality.
Well the point of this change is explicitly giving up on cache locality in favor of responsiveness, right? I.e., once you put suspension points in here you expect to lose some/most of the cache when suspended, regardless of whether the next task that runs involves compression or not. Though you are right that if all tasks are similar in size and large interleaving serves little purpose.
I believe you need to keep the workspace around for the entire [de]compression operation: the context carries context from one ZSTD_compressStream2 call to the next (both settings and [de]compression state): the output is streaming but not independent: the [de]compression of later chunks depends on what happened in the earlier ones (without some kind of explicitly flush operation). If that's right, you do need exclusive access to the workspace for the duration of the [de]compression. Maybe @ballard26 can confirm one way or the other.
You're right, my eyes were skimming over the ZSTD_CCtx_reset etc at the outer part of the function, that necessitates the mutex.
So that brings me back to my big vs. small RPCs thing: maybe if we had multiple workspaces the right number would be two, one for small RPCs to whizz through in one synchronous step, and one for the big RPCs that hit the yielding path and need the mutex.
We already do have two workspaces though! One for the new verison and one for the old sync version. So maybe something neat naturally falls out here when we unify the two: one potentially-async class, with two workspaces, one for the jobs that are actually de-facto synchronous because they're small, and one for the big jobs that will include sleeps.
LGTM, thanks for all the changes!
rptest.tests.schema_registry_test.test_delete_subject_version test failure is https://github.com/redpanda-data/redpanda/issues/6903 .
Is this worth backporting?
Might be a lot of effort (it's a large-ish PR and might not be easy backport) and I would prefer we rather recommended users to upgrade, honestly, @BenPope
I understand. This isn't in any released version, so that advice will have to wait until at least v23.1.1.
Ah I see what you mean, it isn't backported even to v22.3.x. Yeah, that we might want to do. @ballard26 ?
/backport v22.3.x