redpanda icon indicating copy to clipboard operation
redpanda copied to clipboard

Add an asychronous zstd stream compression class

Open ballard26 opened this issue 3 years ago • 9 comments
trafficstars

Cover letter

This PR adds a new Zstd streaming interface async_stream_zstd that is different from the existing stream_zstd in a few ways.

Firstly all compression/decompression methods return futures and allow for seastar to interrupt them if they are close to hitting the reactor stall timeout. This interface change is the main justification for a new class instead of modifying the existing one as a lot of non-futurized code in v/kafka expects compress/uncompress to be a blocking call that returns the actual result right away. Hopefully we'll be able to migrate users of stream_zstd to async_stream_zstd in time.

Secondly in compress currently stream_zstd allocates a large contiguous buffer for the entire expected size for the compressed data block this has shown to cause OOM errors when the data block is too large (see #5566 ). async_stream_zstd instead of allocating a large contiguous buffer for the compression output is outputting a Zstd block at a time (128KiB max size) and then appending that block to an iobuf. This change should probably be ported over to stream_zstd as well in case any kafka API messages prove to be large enough to cause OOM errors.

Fixes #5116, #5566

Release notes

  • none

ballard26 avatar Jul 22 '22 03:07 ballard26

Is the gist to use thread local storage of 2mb as the scratch space. Can the cover letter have more deets.

emaxerrno avatar Jul 22 '22 04:07 emaxerrno

Is the gist to use thread local storage of 2mb as the scratch space. Can the cover letter have more deets.

Updated the cover letter to explain what's going on in the PR. Sorry for the delay! This new class will allocated about 4mb of scratch space for compression/decompression on each thread at the start of RP. 2mb will be for decompression and 2mb will be for compression. By doing this we can guarantee that Zstd won't internally allocate any more space during any compression/decompression operations.

ballard26 avatar Jul 22 '22 21:07 ballard26

Results of performance testing vs the existing stream_zstd class are as follows:

test                                      iterations      median         mad         min         max      allocs       tasks        inst
streaming_zstd_1mb.compress                     6385   130.290us     1.513us   126.861us   137.173us      16.000       0.000         0.0
streaming_zstd_1mb.uncompress                   4185    78.486us     2.100us    75.342us    80.586us     108.616       0.000         0.0
streaming_zstd_10mb.compress                     653     1.118ms    24.680us     1.041ms     1.142ms     942.830       0.000         0.0
streaming_zstd_10mb.uncompress                   429   844.631us    80.656us   760.340us   976.986us    2095.301       0.000         0.0
async_stream_zstd.1mb_compress                  7616   100.022us   262.374ns    99.547us   100.598us     148.278       2.082         0.0
async_stream_zstd.1mb_uncompress                4804    73.661us   108.821ns    73.542us    73.819us     312.587       6.156         0.0
async_stream_zstd.10mb_compress                  772   979.403us     1.092us   978.266us   985.029us    2176.762      56.239         0.0
async_stream_zstd.10mb_uncompress                490   728.405us   720.304ns   726.690us   731.632us    4089.282     109.756         0.0

Some of the conclusions that can be made are;

async_stream_zstd is a bit faster in compression than stream_zstd .This is probably due to using a statically allocated workspace vs. the dynamically allocated one that stream_zstd uses.

In all tests async_stream_zstd has more allocations than stream_zstd. This is becuase async_stream_zstd allocates lots of small fragments to store the results of compressions and decompressions. This can be mitigated if seen as an issue by having iobuf create larger fragments when appending new data.

Beyond this stream_zstd causes several seastar reactor stalls during the benchmark. While async_stream_zstd does not cause any.

ballard26 avatar Jul 27 '22 20:07 ballard26

@ballard26 we should make this the default in our internal RPC

emaxerrno avatar Jul 27 '22 22:07 emaxerrno

/ci-repeat 5

ballard26 avatar Oct 12 '22 23:10 ballard26

I was just re-reading the cover page and understand now that the static allocation of the workspace is new with this change.

One concern I have is that this may not really reduce the bottleneck, only hide the symptom. Before, a large [de]compression would cause reactor stalls because compression was synchronous. Now, a large compression won't cause reactor stalls, because it can be suspended, but while it is suspended any other task trying to use the compressor (not at all unlikely given how prevalent RPC calls are) will itself have to wait for the workspace to be available. So large [de]compressions will cause "stalls" but not reported as reactor stalls.

One possibility is to allocate at least a "few" workspaces so that more than one [de]compression can be in progress at once. I was discussing this with @ballard26 but it isn't clear what the right number would be. Perhaps it can scale with the shard memory, like 1 + shard_memory / 500M or something.

@dotnwat and @jcsp your thoughts appreciated.

travisdowns avatar Oct 17 '22 23:10 travisdowns

Now, a large compression won't cause reactor stalls, because it can be suspended, but while it is suspended any other task trying to use the compressor (not at all unlikely given how prevalent RPC calls are) will itself have to wait for the workspace to be available

The way I'm reading it (admittedly a little hastily), the code in the PR doesn't actually use the workspace across a scheduling point: by the time we get to a yield point in do_compress, the workspace has already been copied out into the output iobuf. So the mutex isn't strictly necessary to protect the buffer, and we have a choice about whether to allow async compressions to interleave.

If we had many big RPCs, then I would favor not interleaving them, to get best cache locality. But i think the situation we actually have is more like many small RPCs, and occasional big ones (e.g. our monolithic node health reports).

So maybe the behavior should be that any RPC which can complete its work without sleeping just proceeds without the mutex, and the mutex is only taken for "big" RPCs that will have yield points.

jcsp avatar Oct 18 '22 11:10 jcsp

The way I'm reading it (admittedly a little hastily), the code in the PR doesn't actually use the workspace across a scheduling point: by the time we get to a yield point in do_compress, the workspace has already been copied out into the output iobuf.

I believe you need to keep the workspace around for the entire [de]compression operation: the context carries context from one ZSTD_compressStream2 call to the next (both settings and [de]compression state): the output is streaming but not independent: the [de]compression of later chunks depends on what happened in the earlier ones (without some kind of explicitly flush operation). If that's right, you do need exclusive access to the workspace for the duration of the [de]compression. Maybe @ballard26 can confirm one way or the other.

If we had many big RPCs, then I would favor not interleaving them, to get best cache locality.

Well the point of this change is explicitly giving up on cache locality in favor of responsiveness, right? I.e., once you put suspension points in here you expect to lose some/most of the cache when suspended, regardless of whether the next task that runs involves compression or not. Though you are right that if all tasks are similar in size and large interleaving serves little purpose.

travisdowns avatar Oct 19 '22 19:10 travisdowns

I believe you need to keep the workspace around for the entire [de]compression operation: the context carries context from one ZSTD_compressStream2 call to the next (both settings and [de]compression state): the output is streaming but not independent: the [de]compression of later chunks depends on what happened in the earlier ones (without some kind of explicitly flush operation). If that's right, you do need exclusive access to the workspace for the duration of the [de]compression. Maybe @ballard26 can confirm one way or the other.

You're right, my eyes were skimming over the ZSTD_CCtx_reset etc at the outer part of the function, that necessitates the mutex.

So that brings me back to my big vs. small RPCs thing: maybe if we had multiple workspaces the right number would be two, one for small RPCs to whizz through in one synchronous step, and one for the big RPCs that hit the yielding path and need the mutex.

We already do have two workspaces though! One for the new verison and one for the old sync version. So maybe something neat naturally falls out here when we unify the two: one potentially-async class, with two workspaces, one for the jobs that are actually de-facto synchronous because they're small, and one for the big jobs that will include sleeps.

jcsp avatar Oct 20 '22 21:10 jcsp

LGTM, thanks for all the changes!

travisdowns avatar Dec 01 '22 19:12 travisdowns

rptest.tests.schema_registry_test.test_delete_subject_version test failure is https://github.com/redpanda-data/redpanda/issues/6903 .

ballard26 avatar Dec 01 '22 22:12 ballard26

Is this worth backporting?

BenPope avatar Dec 19 '22 11:12 BenPope

Might be a lot of effort (it's a large-ish PR and might not be easy backport) and I would prefer we rather recommended users to upgrade, honestly, @BenPope

piyushredpanda avatar Dec 19 '22 14:12 piyushredpanda

I understand. This isn't in any released version, so that advice will have to wait until at least v23.1.1.

BenPope avatar Dec 19 '22 14:12 BenPope

Ah I see what you mean, it isn't backported even to v22.3.x. Yeah, that we might want to do. @ballard26 ?

piyushredpanda avatar Dec 19 '22 14:12 piyushredpanda

/backport v22.3.x

ballard26 avatar Dec 19 '22 20:12 ballard26