sarama
sarama copied to clipboard
feat: Make compression encoder pool size controlled by MaxBufferedCompressionEncoders a configurable parameter
This PR is addressing issue: https://github.com/IBM/sarama/issues/2965
For Zstd compression, currently the pool size of encoders (controlled by MaxBufferedEncoders param) is hard-coded to be 1, this caused excessive number of large encoder objects being created under high concurrency which caused GC performance issue.
Introduce a new config parameter: MaxBufferedCompressionEncoders (default value 1) and modify the calling paths to pass this parameter through message/record_batch and down to compressor/encoder.
@dnwe Check to see if you like the idea of making the encoder pool size configurable.
@HenryCaiHaiying thanks for the PR. On a general note I think we would want to be similar stdlib naming with MaxIdleEncoders rather than MaxBufferedEncoders. However, I also wonder whether @klauspost (or @rtreffer) had any thoughts the proposed usage of zstd here?
We previously merged https://github.com/IBM/sarama/pull/2375 which tried to avoid the memory cost (on high CPU core machines) of zstd maintaining an encoder per core and instead passed zstd.WithEncoderConcurrency(1) to zstd and started to internally keep a single re-usable encoder around and allocated the rest on-demand. Now with this PR we'd provide a config parameter to keep more than 1 warm encoder around in our custom pool, but should we just be passing this through as the value to WithEncoderConcurrency instead and removing our custom code?
I am sorry for the wall of text, I'll go over
- history of the original PR
- the issue described
- encoder concurrency
- the solution space (IMHO)
History of the original PR
The problem was that we were constrained by storage and network transfer. We were also scaling to larger machines (48 or 96 cores iirc). Compression level was set to high (minimize network / storage). Zstd was used as a better gzip.
With that we got gigabytes of preallocated memory (for 96 cores, while using 2-5, and 32MB state). Coupled with a GOMEMLIMIT we ran into a GC death spiral.
Messages were small, but we used a custom partitioner that would send less than 1 MB to each partition in a round robin manner, batching up messages as efficiently as possible, keeping in flight requests low, while increasing compression ratios (even 1MB is rather small for a compression algorithm like zstd). I can highly recommend such a partitioner if messages don't require ordering or predictable partition assignment. I was told that the java libraries have a partitioner like this. I could probably contributed something like it.
It worked: these changes massively dropped network and storage requirements. And memory consumption as well as pprof results were good.
Reported issue
At 2-4kb of allocations per message the messages are likely to be 1kb or less. Or ~1000x worse than what the original PR used. Batching also reduces the frequency of requesting an encoder, and higher compression settings mean we were using encoders longer.
I think the raised issue is valid: this is the worst case scenario. It is the exact opposite side of the spectrum. Speed optimized compression, small messages, high concurrency.
Encoder concurrency
WithEncoderConcurrency makes a single encoder use multiple goroutines. There are 2 problems with this:
- We have very small payloads (1MB max), meaning the speedup will likely be low (might even be non-existing)
- We have high concurrency before we hit the encoder
So running n encoders with no concurrency should provide the best results.
This is not backed by a benchmark though.
Solution space
Making the pool size configurable is one option.
Back in the day I was thinking of a pool with expiry. E.g. keep a []struct{Encoder, time.Time} (used as a FIFO queue) and on each enqueue / dequeue of an encoder check for old (e.g. >=1s or >=5s) encoders. This would likely remove the need for any configuration and the cleanup work can be done inline before each enqueue and after each dequeue. It is unlikely to get the described GC pressure if the instances are kept for >1s at least while allowing to shrink to 1.
My personal preference for this case would probably be
- Better batching (if possible) - this is a completely different approach though
- A configuration free approach
- Config setting
Complexity goes probably the opposite way :see_no_evil:
I hope this is somewhat helpful.
Thanks @rtreffer for the history flashback and detailed analysis. I think our case is a bit different than yours, we have 500 Kafka brokers on the downstream so anytime during the process run, there are probably hundreds of goroutines (one corresponds to one broker) trying to do message compression and producing. And the producer batch size is very small in our case (only a few KB), we couldn't make our batch size bigger without significantly re-architect our product. So we need either a configurable parameter for the pool size or make the pool smart to shrink if some objects are not used for a long time. I think the smart pool idea is good but it would probably need some effort to make it right and it probably still needs a configurable parameter (e.g. the decay timeout setting).
And agrees with your opinion on not using encoder_options.WithEncoderConcurreny(n) because we are only calling EncodeAll() once for each encoder object (unless we changes the sarama's zstd code to only create one singleton encoder object).
@puellanivis Are you still expecting me to make some code changes? I think I've addressed all the review comments.
@puellanivis Are you still expecting me to make some code changes? I think I've addressed all the review comments.
Nope. I don’t see anything pressing to address. I just also cannot give an approval, or resolve the issues I already opened. 😂
So who can approve this PR?
So... In the original PR I wrongly assumed that we would be tied to GOMAXPROCS as the upper bound for zstd encoders.
I just wrote a test case that shows this assumption is wrong, I assume it changed in 2020 when golang goroutines became more preememptive.
See https://github.com/rtreffer/sarama/commit/2bad38e9b93426393fa9e5e03af2e09ede68298d
- Set GOMAXPROCS to 1
- Wait for 500 goroutines creating a 1kb message each
- Let each goroutine check out an encoder and keep track of the number of checked out encoders
- Encode the buffer
- Return the encoder, decrement the checked out encoders
- Return the results, find the maximum number of encoders
At GOMAXPROCS=1 there shouldn't be 15 or 16 encoders. Note that the test is a bit flaky. But there is a bug on the other side of the spectrum, too, and a good chance the original issue is also impacted by that.
@rtreffer Yes the max number of encoders being created corresponds to the max number of goroutines not the number of cpu cores. On my laptop I have 10 cpu cores, if I create 30 goroutines I can see the number of encoders being created can also go to 30.
Sorry. I didn't have time to go through the PR before now.
It seems you are wastly overcomplicating things. There is no need to buffer encoders, since you are only using them for EncodeAll. This can be called concurrently and the encoder will allow for a predefined number of concurrent calls.
When creating the Encoder, simply use WithEncoderConcurrency to set the maximum number of concurrent EncodeAll calls to allow. So a simple params -> *Encoder mapping is all you need.
The encoder will then keep this number of internal decoders around between calls.
@klauspost if we go with the encoder pool in your library, we would also need to change the Sarama client to use a singleton Encoder object and still needs to configure this Sarama client to has this concurrency number configurable and pass it down to your library. It seems @rtreffer Has a different approach trying to make the pool smarter (upper bound the pool size to be GOMAXPROCS, the pool can grown/shrink base on the usage, no need to configure the pool size): https://github.com/rtreffer/sarama/commit/23168b5f68474ff129c3fa89d8d12cfdb8ac766a
@HenryCaiHaiying You can do whatever you want. This just smells like over-engineering to me.
You never need more than 1 instance per config. You can use the same instance for all your encodes. Setting the concurrency limit does exactly what you are trying to do.
If you for whatever reason want to change the concurrency, you can just create a new instance, do an atomic replace and start using that.
@klauspost unfortunately the concurrency setting does not have the desired outcome. The main problem is the greedy allocation in zstd/encoder.go#L87-L90
The problem in the past was that on high core count machines (e.g. 96 core machines) the encoder would pre-allocate 96 instances, which can add up to gigabytes of memory. This is particularly problematic in multi-tenancy setups (kubernetes). 96 cores is currently the second largest general purpose size on latest generation AWS instances.
The main feature here is lazy allocations of encoders to avoid issues on setups where GOMAXPROCS is high but the actual parallelism is low. I would be very happy if you could show how to reach that with less effort from our end. I do agree that in the case of a config setting we could just turn it into documented behavior.
However my design goals are:
- Low memory consumption for processes that use sarama with low parallelism (currently OK)
- Deterministic maximum memory consumption for processes that use high concurrency (currently broken)
- Reuse of encoders to avoid wasteful allocations (works only for low concurrency)
This would avoid any config tweaks. I finally opened an alternative PR for that: #2979 // CC @HenryCaiHaiying
the encoder would pre-allocate 96 instances, which can add up to gigabytes of memory.
Let's get the numbers: WithEncoderConcurrency(96) and WithLowerEncoderMem(true):
Allocs are for creating the Encoder and running 96 encodes to warm up the encoders:
zstd_test.go:80: TestWriterMemUsage/enc-all-lower/level-fastest: Memory Used: 405MB, 3121 allocs
zstd_test.go:80: TestWriterMemUsage/enc-all-lower/level-default: Memory Used: 886MB, 3100 allocs
zstd_test.go:80: TestWriterMemUsage/enc-all-lower/level-better: Memory Used: 1157MB, 1373 allocs
zstd_test.go:80: TestWriterMemUsage/enc-all-lower/level-best: Memory Used: 4061MB, 1418 allocs
Pretty much all of this is allocating a window, based on the Window Size. For example setting WithWindowSize(1<<20):
zstd_test.go:80: TestWriterMemUsage/enc-all-lower/level-fastest: Memory Used: 117MB, 3115 allocs
zstd_test.go:80: TestWriterMemUsage/enc-all-lower/level-default: Memory Used: 214MB, 3100 allocs
zstd_test.go:80: TestWriterMemUsage/enc-all-lower/level-better: Memory Used: 485MB, 1368 allocs
zstd_test.go:80: TestWriterMemUsage/enc-all-lower/level-best: Memory Used: 3389MB, 1425 allocs
The compression loss would be small, and all payloads < 1MB will be unaffected.
EncodeAll with < 1 block (64K/128K) has a shortcut, so no history is allocated - example doing 50KB encodes:
zstd_test.go:80: TestWriterMemUsage/enc-all-lower/level-fastest: Memory Used: 26MB, 1294 allocs
zstd_test.go:80: TestWriterMemUsage/enc-all-lower/level-default: Memory Used: 123MB, 1279 allocs
(this is not implemented for better/best, so numbers are as above)
In principle this could be applied to all EncodeAll operations, but it will require a bit of "plumbing" to work.
Limiting the window size and capping the concurrency are easy mitigations. Here are the static allocs with WithEncoderConcurrency(32), WithLowerEncoderMem(true), WithWindowSize(1<<20) and encoding 10MB payloads:
zstd_test.go:80: TestWriterMemUsage/enc-all-lower/level-fastest: Memory Used: 25MB, 1067 allocs
zstd_test.go:80: TestWriterMemUsage/enc-all-lower/level-default: Memory Used: 58MB, 1045 allocs
zstd_test.go:80: TestWriterMemUsage/enc-all-lower/level-better: Memory Used: 151MB, 473 allocs
zstd_test.go:80: TestWriterMemUsage/enc-all-lower/level-best: Memory Used: 1139MB, 511 allocs
So for payload < 1MB, are WithLowerEncoderMem(true) and WithWindowSize(1<<20) recommended? Any downside of using those flags?
Any downside of using those flags?
None. Even with payloads >1MB the difference will be minimal. It will also reduce memory usage on decompression as a bonus.
"LowerEncoderMem" will do more lazy allocations, and over-allocate the window, so operations longer than the window is done with fewer copies - so mostly a very minor speed difference.
So 3GB-4GB on 96 cores without setting the concurrency is roughly in line with what I was seeing. Setting the compression level to best cut our bills >50% with no downside after the memory consumption fix. Some vendors charge for bytes stored and transferred, so better compression and batching is straight up :moneybag:
Capping the window size at 1MB should be done no matter the implementation as this is the default max message size in kafka and going above this limit is annoying so most installations won't (needs to be configured on the servers and for all clients). That's a really useful call-out.
I see this discussion mostly trade-offs between
- code complexity / simplicity
- configuration parameters
- memory consumption
I am more on the side that the library should adapt to usage, but I am fine with any other direction, too. I would love for a decision on that front though given that the bug report shows a real issue.
I think I am fine with either directions (smart pool or number of encoder instances per config), but it seems we would also need to update the default config for encoder window size (and/or using LowerEncoderMemSetting). Even if we go with the simple setting with the number of encoders, we might also want to update the code to pass the the number of encoder setting to WithEncoderConcurrency()
Thank you for your contribution! However, this pull request has not had any activity in the past 90 days and will be closed in 30 days if no updates occur. If you believe the changes are still valid then please verify your branch has no conflicts with main and rebase if needed. If you are awaiting a (re-)review then please let us know.