sarama icon indicating copy to clipboard operation
sarama copied to clipboard

feat: Make compression encoder pool size controlled by MaxBufferedCompressionEncoders a configurable parameter

Open HenryCaiHaiying opened this issue 1 year ago • 18 comments

This PR is addressing issue: https://github.com/IBM/sarama/issues/2965

For Zstd compression, currently the pool size of encoders (controlled by MaxBufferedEncoders param) is hard-coded to be 1, this caused excessive number of large encoder objects being created under high concurrency which caused GC performance issue.

Introduce a new config parameter: MaxBufferedCompressionEncoders (default value 1) and modify the calling paths to pass this parameter through message/record_batch and down to compressor/encoder.

HenryCaiHaiying avatar Aug 21 '24 01:08 HenryCaiHaiying

@dnwe Check to see if you like the idea of making the encoder pool size configurable.

HenryCaiHaiying avatar Aug 21 '24 01:08 HenryCaiHaiying

@HenryCaiHaiying thanks for the PR. On a general note I think we would want to be similar stdlib naming with MaxIdleEncoders rather than MaxBufferedEncoders. However, I also wonder whether @klauspost (or @rtreffer) had any thoughts the proposed usage of zstd here?

We previously merged https://github.com/IBM/sarama/pull/2375 which tried to avoid the memory cost (on high CPU core machines) of zstd maintaining an encoder per core and instead passed zstd.WithEncoderConcurrency(1) to zstd and started to internally keep a single re-usable encoder around and allocated the rest on-demand. Now with this PR we'd provide a config parameter to keep more than 1 warm encoder around in our custom pool, but should we just be passing this through as the value to WithEncoderConcurrency instead and removing our custom code?

dnwe avatar Aug 21 '24 19:08 dnwe

I am sorry for the wall of text, I'll go over

  • history of the original PR
  • the issue described
  • encoder concurrency
  • the solution space (IMHO)

History of the original PR

The problem was that we were constrained by storage and network transfer. We were also scaling to larger machines (48 or 96 cores iirc). Compression level was set to high (minimize network / storage). Zstd was used as a better gzip.

With that we got gigabytes of preallocated memory (for 96 cores, while using 2-5, and 32MB state). Coupled with a GOMEMLIMIT we ran into a GC death spiral.

Messages were small, but we used a custom partitioner that would send less than 1 MB to each partition in a round robin manner, batching up messages as efficiently as possible, keeping in flight requests low, while increasing compression ratios (even 1MB is rather small for a compression algorithm like zstd). I can highly recommend such a partitioner if messages don't require ordering or predictable partition assignment. I was told that the java libraries have a partitioner like this. I could probably contributed something like it.

It worked: these changes massively dropped network and storage requirements. And memory consumption as well as pprof results were good.

Reported issue

At 2-4kb of allocations per message the messages are likely to be 1kb or less. Or ~1000x worse than what the original PR used. Batching also reduces the frequency of requesting an encoder, and higher compression settings mean we were using encoders longer.

I think the raised issue is valid: this is the worst case scenario. It is the exact opposite side of the spectrum. Speed optimized compression, small messages, high concurrency.

Encoder concurrency

WithEncoderConcurrency makes a single encoder use multiple goroutines. There are 2 problems with this:

  1. We have very small payloads (1MB max), meaning the speedup will likely be low (might even be non-existing)
  2. We have high concurrency before we hit the encoder

So running n encoders with no concurrency should provide the best results. This is not backed by a benchmark though.

Solution space

Making the pool size configurable is one option.

Back in the day I was thinking of a pool with expiry. E.g. keep a []struct{Encoder, time.Time} (used as a FIFO queue) and on each enqueue / dequeue of an encoder check for old (e.g. >=1s or >=5s) encoders. This would likely remove the need for any configuration and the cleanup work can be done inline before each enqueue and after each dequeue. It is unlikely to get the described GC pressure if the instances are kept for >1s at least while allowing to shrink to 1.

My personal preference for this case would probably be

  1. Better batching (if possible) - this is a completely different approach though
  2. A configuration free approach
  3. Config setting

Complexity goes probably the opposite way :see_no_evil:

I hope this is somewhat helpful.

rtreffer avatar Aug 21 '24 20:08 rtreffer

Thanks @rtreffer for the history flashback and detailed analysis. I think our case is a bit different than yours, we have 500 Kafka brokers on the downstream so anytime during the process run, there are probably hundreds of goroutines (one corresponds to one broker) trying to do message compression and producing. And the producer batch size is very small in our case (only a few KB), we couldn't make our batch size bigger without significantly re-architect our product. So we need either a configurable parameter for the pool size or make the pool smart to shrink if some objects are not used for a long time. I think the smart pool idea is good but it would probably need some effort to make it right and it probably still needs a configurable parameter (e.g. the decay timeout setting).

And agrees with your opinion on not using encoder_options.WithEncoderConcurreny(n) because we are only calling EncodeAll() once for each encoder object (unless we changes the sarama's zstd code to only create one singleton encoder object).

HenryCaiHaiying avatar Aug 21 '24 22:08 HenryCaiHaiying

@puellanivis Are you still expecting me to make some code changes? I think I've addressed all the review comments.

HenryCaiHaiying avatar Aug 23 '24 17:08 HenryCaiHaiying

@puellanivis Are you still expecting me to make some code changes? I think I've addressed all the review comments.

Nope. I don’t see anything pressing to address. I just also cannot give an approval, or resolve the issues I already opened. 😂

puellanivis avatar Aug 24 '24 01:08 puellanivis

So who can approve this PR?

HenryCaiHaiying avatar Aug 24 '24 02:08 HenryCaiHaiying

So... In the original PR I wrongly assumed that we would be tied to GOMAXPROCS as the upper bound for zstd encoders.

I just wrote a test case that shows this assumption is wrong, I assume it changed in 2020 when golang goroutines became more preememptive.

See https://github.com/rtreffer/sarama/commit/2bad38e9b93426393fa9e5e03af2e09ede68298d

  1. Set GOMAXPROCS to 1
  2. Wait for 500 goroutines creating a 1kb message each
  3. Let each goroutine check out an encoder and keep track of the number of checked out encoders
  4. Encode the buffer
  5. Return the encoder, decrement the checked out encoders
  6. Return the results, find the maximum number of encoders

At GOMAXPROCS=1 there shouldn't be 15 or 16 encoders. Note that the test is a bit flaky. But there is a bug on the other side of the spectrum, too, and a good chance the original issue is also impacted by that.

rtreffer avatar Aug 24 '24 14:08 rtreffer

@rtreffer Yes the max number of encoders being created corresponds to the max number of goroutines not the number of cpu cores. On my laptop I have 10 cpu cores, if I create 30 goroutines I can see the number of encoders being created can also go to 30.

HenryCaiHaiying avatar Aug 24 '24 16:08 HenryCaiHaiying

Sorry. I didn't have time to go through the PR before now.

It seems you are wastly overcomplicating things. There is no need to buffer encoders, since you are only using them for EncodeAll. This can be called concurrently and the encoder will allow for a predefined number of concurrent calls.

When creating the Encoder, simply use WithEncoderConcurrency to set the maximum number of concurrent EncodeAll calls to allow. So a simple params -> *Encoder mapping is all you need.

The encoder will then keep this number of internal decoders around between calls.

klauspost avatar Aug 28 '24 13:08 klauspost

@klauspost if we go with the encoder pool in your library, we would also need to change the Sarama client to use a singleton Encoder object and still needs to configure this Sarama client to has this concurrency number configurable and pass it down to your library. It seems @rtreffer Has a different approach trying to make the pool smarter (upper bound the pool size to be GOMAXPROCS, the pool can grown/shrink base on the usage, no need to configure the pool size): https://github.com/rtreffer/sarama/commit/23168b5f68474ff129c3fa89d8d12cfdb8ac766a

HenryCaiHaiying avatar Sep 04 '24 05:09 HenryCaiHaiying

@HenryCaiHaiying You can do whatever you want. This just smells like over-engineering to me.

You never need more than 1 instance per config. You can use the same instance for all your encodes. Setting the concurrency limit does exactly what you are trying to do.

If you for whatever reason want to change the concurrency, you can just create a new instance, do an atomic replace and start using that.

klauspost avatar Sep 04 '24 14:09 klauspost

@klauspost unfortunately the concurrency setting does not have the desired outcome. The main problem is the greedy allocation in zstd/encoder.go#L87-L90

The problem in the past was that on high core count machines (e.g. 96 core machines) the encoder would pre-allocate 96 instances, which can add up to gigabytes of memory. This is particularly problematic in multi-tenancy setups (kubernetes). 96 cores is currently the second largest general purpose size on latest generation AWS instances.

The main feature here is lazy allocations of encoders to avoid issues on setups where GOMAXPROCS is high but the actual parallelism is low. I would be very happy if you could show how to reach that with less effort from our end. I do agree that in the case of a config setting we could just turn it into documented behavior.

However my design goals are:

  1. Low memory consumption for processes that use sarama with low parallelism (currently OK)
  2. Deterministic maximum memory consumption for processes that use high concurrency (currently broken)
  3. Reuse of encoders to avoid wasteful allocations (works only for low concurrency)

This would avoid any config tweaks. I finally opened an alternative PR for that: #2979 // CC @HenryCaiHaiying

rtreffer avatar Sep 07 '24 14:09 rtreffer

the encoder would pre-allocate 96 instances, which can add up to gigabytes of memory.

Let's get the numbers: WithEncoderConcurrency(96) and WithLowerEncoderMem(true):

Allocs are for creating the Encoder and running 96 encodes to warm up the encoders:

    zstd_test.go:80: TestWriterMemUsage/enc-all-lower/level-fastest: Memory Used: 405MB, 3121 allocs
    zstd_test.go:80: TestWriterMemUsage/enc-all-lower/level-default: Memory Used: 886MB, 3100 allocs
    zstd_test.go:80: TestWriterMemUsage/enc-all-lower/level-better: Memory Used: 1157MB, 1373 allocs
    zstd_test.go:80: TestWriterMemUsage/enc-all-lower/level-best: Memory Used: 4061MB, 1418 allocs

Pretty much all of this is allocating a window, based on the Window Size. For example setting WithWindowSize(1<<20):

    zstd_test.go:80: TestWriterMemUsage/enc-all-lower/level-fastest: Memory Used: 117MB, 3115 allocs
    zstd_test.go:80: TestWriterMemUsage/enc-all-lower/level-default: Memory Used: 214MB, 3100 allocs
    zstd_test.go:80: TestWriterMemUsage/enc-all-lower/level-better: Memory Used: 485MB, 1368 allocs
    zstd_test.go:80: TestWriterMemUsage/enc-all-lower/level-best: Memory Used: 3389MB, 1425 allocs

The compression loss would be small, and all payloads < 1MB will be unaffected.

EncodeAll with < 1 block (64K/128K) has a shortcut, so no history is allocated - example doing 50KB encodes:

    zstd_test.go:80: TestWriterMemUsage/enc-all-lower/level-fastest: Memory Used: 26MB, 1294 allocs
    zstd_test.go:80: TestWriterMemUsage/enc-all-lower/level-default: Memory Used: 123MB, 1279 allocs
    
   (this is not implemented for better/best, so numbers are as above)

In principle this could be applied to all EncodeAll operations, but it will require a bit of "plumbing" to work.

Limiting the window size and capping the concurrency are easy mitigations. Here are the static allocs with WithEncoderConcurrency(32), WithLowerEncoderMem(true), WithWindowSize(1<<20) and encoding 10MB payloads:

    zstd_test.go:80: TestWriterMemUsage/enc-all-lower/level-fastest: Memory Used: 25MB, 1067 allocs
    zstd_test.go:80: TestWriterMemUsage/enc-all-lower/level-default: Memory Used: 58MB, 1045 allocs
    zstd_test.go:80: TestWriterMemUsage/enc-all-lower/level-better: Memory Used: 151MB, 473 allocs
    zstd_test.go:80: TestWriterMemUsage/enc-all-lower/level-best: Memory Used: 1139MB, 511 allocs

klauspost avatar Sep 10 '24 10:09 klauspost

So for payload < 1MB, are WithLowerEncoderMem(true) and WithWindowSize(1<<20) recommended? Any downside of using those flags?

HenryCaiHaiying avatar Sep 11 '24 04:09 HenryCaiHaiying

Any downside of using those flags?

None. Even with payloads >1MB the difference will be minimal. It will also reduce memory usage on decompression as a bonus.

"LowerEncoderMem" will do more lazy allocations, and over-allocate the window, so operations longer than the window is done with fewer copies - so mostly a very minor speed difference.

klauspost avatar Sep 11 '24 08:09 klauspost

So 3GB-4GB on 96 cores without setting the concurrency is roughly in line with what I was seeing. Setting the compression level to best cut our bills >50% with no downside after the memory consumption fix. Some vendors charge for bytes stored and transferred, so better compression and batching is straight up :moneybag:

Capping the window size at 1MB should be done no matter the implementation as this is the default max message size in kafka and going above this limit is annoying so most installations won't (needs to be configured on the servers and for all clients). That's a really useful call-out.

I see this discussion mostly trade-offs between

  • code complexity / simplicity
  • configuration parameters
  • memory consumption

I am more on the side that the library should adapt to usage, but I am fine with any other direction, too. I would love for a decision on that front though given that the bug report shows a real issue.

rtreffer avatar Sep 15 '24 16:09 rtreffer

I think I am fine with either directions (smart pool or number of encoder instances per config), but it seems we would also need to update the default config for encoder window size (and/or using LowerEncoderMemSetting). Even if we go with the simple setting with the number of encoders, we might also want to update the code to pass the the number of encoder setting to WithEncoderConcurrency()

HenryCaiHaiying avatar Sep 16 '24 05:09 HenryCaiHaiying

Thank you for your contribution! However, this pull request has not had any activity in the past 90 days and will be closed in 30 days if no updates occur. If you believe the changes are still valid then please verify your branch has no conflicts with main and rebase if needed. If you are awaiting a (re-)review then please let us know.

github-actions[bot] avatar Dec 15 '24 06:12 github-actions[bot]