[Bug] Update netty allocator options to avoid oom when using EntryFilters
Search before reporting
- [x] I searched in the issues and found nothing similar.
Read release policy
- [x] I understand that unsupported versions don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker.
User environment
Broker version: 3.0LTS, 4.0 LTS Broker Operating system and hardware type: 5.4.241-1-tlinux4 (cloud vendor host Linux distributions, acted like CentOS) Broker Java version: jdk 21 Client library type: Java Client library version: apache-pulsar-3.0.4 Client Operating system and hardware type: 5.4.241-1-tlinux4 Client Java version: openjdk version "17.0.12" 2024-07-23 LTS
Issue Description
One of our maintained Pulsar instances's brokers (with a 16C32G spec.) frequently experience native memory OOM.
However, the memory usage shown in the jcmd <pid> native_memory summary does not match the top command process RES usage. Typically, just before an OOM occurred, top shows memory usage of around 31GB, while the total memory usage in the jcmd <broker-pid> native_memory summary is only about 20GB. Pulsar broker restarts due to OOM is abnormal and need to find out the reason.
Error messages
Process was killed by K8S OOMKiller and print error log in kubectl describe pod command "Last State section" part: CGroup OOM encountered, victim process: java, pid: <broker-pid>
Reproducing the issue
I believe OOM scenario is related to the broker frequently reading large-sized messages from bookies and then quickly release them. This scenario is more likely to occurred in broadcast use case(with large amount of subscriptions and high filter ratio). Below is one of reproduce ways:
- Use 1 broker with 8C16G 5Gbps NIC hardware and 3 bookie, configure
E:Qw:Qa = 2:2:1for improving bookie's message throughput - Create approximately 1000 subscriptions, each subscription only creates one consumer, and add unique properties to each subscription
- Send a 1MB message every 1 second, and each message's properties matching only one random subscription's properties among these 1000 subscriptions
- Configure broker EntryFilters to ensure that each message will only be sent to one subscription that matches it, while all other subscriptions cannot receive the message
- Make sure broker ledger cache is always missed
Additional information
I believe this issue is mainly related to the memory allocation mechanism of Netty's ByteBuf allocator:
-
When message size is large with high filtering ratio, due to Pulsar's default configuration of
dispatcherMaxReadSizeBytes=5MBanddispatcherMaxBatchSize=100, a singlereadMoreEntries()batch can easily reach the maximum read size limit of 5MB. With the currentE:Qw:Qa = 2:2:1mode, a single read operation only requests from two channel eventLoop threads. Since the default netty chunk size is 4MB(DEFAULT_PAGE_SIZE << DEFAULT_MAX_ORDER), it's easy for the existing chunks in the nettypoolArenato have insufficient space, requiring allocation of new chunks from native memory. -
The newly allocated chunks might be fully occupied by current
readMoreEntries()operation. Due to the high message filtering ratio, it's very likely that messages read in this batch are all filtered out and not delivered to consumers, causing the entry byteBufs to be quickly released in the broker almost at the same time. -
Netty default threadLocal cache size
maxCachedBufferCapacityis 32KB. When a single byteBuf exceeds the cache size, it cannot be cached in threadLocal but directly released and returned to the chunk. When all entries read in this batch are larger than 32KB and released at the same time, it will triggering Netty's chunk recycle strategy that free chunk and give back memory to OS. And I noticed netty allocate and release memory very often in JFR(Java Flight Recorder): -
This issue might be related to the implementation of JDK runtime, Linux distribution, or glibc different
malloc()realization. However, I believe it should be a common problem. In my test environment, when the JDK allocates direct memory smaller than 64MB (i.e., each chunk allocation is 4MB), using commandpmap -x <broker-pid> | sort -n -k3reveals that the OS always requests a memory segment of 61444 KB. When chunks are too frequently allocated and released (maybe at a rate 20times/s), I find that the OS memory cleanup speed cannot keep up with the allocation speed. Which results in the native memory size seen via thetopcommand being significantly larger than in thejcmd <broker-pid> VM.native_memory summary.
To solve this issue, maybe we can update below netty related options:
-Dio.netty.allocator.maxOrder=13 -Dio.netty.allocator.maxCachedBufferCapacity=8388608
-
The
maxOrderparameter adjusts the maximum size of a single chunk. Changing the default from 4M to 64M increases the number of byteBufs that can be stored in a single chunk. Furthermore, after increasing the single chunk size to 64MB, it can accommodate a default maximum 5MB single message. Netty changed the default chunk size from 16M to 4M in version 4.1.76. The primary reason was that the Netty community believed that in most scenarios, each PoolArena does not require very large chunk sizes, as it could easily lead to wasted memory resources in this pr https://github.com/netty/netty/pull/12108: "If we don't need a lot of memory, and certainly not compared to the number of cores on a system, then this will take up more memory than necessary, since each chunk is 16 MiB.", which I think is not very suitable for pulsar use case. In practice, the Logstash community also already reverted it back to the original default value of 16MB. https://github.com/elastic/logstash/issues/15765 -
Adjusting the
maxCachedBufferCapacityparameter increases the maximum size of a single thread-local cache from the default 32KB to 8MB. As a result, the byteBuf corresponding to the default maximum 5MB message can now be cached in the thread-local cache, thereby extending the lifecycle of chunks. Additionally, increasing the cache size reduces the frequency of memory allocation requests by pulsar eventGroup thread-local caches from the PoolArena, which in turn decreases lock waiting time during concurrent memory allocation in the PoolArena.
After adjusting the Netty options mentioned above, I observed a significant reduction in Netty native memory allocation events shown in JFR (Java Flight Recorder), with the frequency dropping to about one allocation and deallocation every 1.5 minutes. Furthermore, the memory usage reported in the JVM's jcmd <broker-pid> native_memory summary now almost is the same as which displayed by the top command. I think under current configuration, the operating system can have enough time to recycle native memory, and after adjustment there is no OOM event anymore.
Are you willing to submit a PR?
- [x] I'm willing to submit a PR!
- When message size is large with high filtering ratio, due to Pulsar's default configuration of
dispatcherMaxReadSizeBytes=5MBanddispatcherMaxBatchSize=100, a singlereadMoreEntries()batch can easily reach the maximum read size limit of 5MB. With the currentE:Qw:Qa = 2:2:1mode, a single read operation only requests from two channel eventLoop threads. Since the default netty chunk size is 4MB(DEFAULT_PAGE_SIZE << DEFAULT_MAX_ORDER), it's easy for the existing chunks in the nettypoolArenato have insufficient space, requiring allocation of new chunks from native memory.
Thanks for bringing this up to discussion. Makes sense. I've been wondering why we haven't received more feedback about this. Increasing the default chunk size to 8MB (-Dio.netty.allocator.maxOrder=10) should be something to consider first since that would be the minimal change to mitigate the issue. I wouldn't increase maxCachedBufferCapacity at all.
When chunks are too frequently allocated and released (maybe at a rate 20times/s), I find that the OS memory cleanup speed cannot keep up with the allocation speed.
That's true. This problem is worse on the client since it becomes worse if the client VM isn't configured in a way that Netty can release buffers in an optimized way https://pulsar.apache.org/docs/4.1.x/client-libraries-java-setup/#java-client-performance .
Regarding direct memory OOM, there's also #24926 which contributes to this situation.
There a blog post by AutoMQ folks about PooledByteBufAllocator memory fragmentation: https://www.automq.com/blog/netty-based-streaming-systems-memory-fragmentation-and-oom-issues . I guess the blog post was written before AdaptiveByteBufAllocator became usable. It would be interesting to also see how AdaptiveByteBufAllocator behaves in Pulsar.
In Pulsar, the broker caching adds more of this fragmentation since it could hold on to much larger buffer when an entry is cached. In Netty, a slice of a buffer will hold the parent buffer in memory until all slices have been released.
The mitigation in Pulsar is managedLedgerCacheCopyEntries=true, but that adds overhead since entries would get copied each time they are added to the cache.
I‘m trying to reproduce the issue, so I'd like to ask what this do you mean, managedLedgerCacheSizeMB =0 ?
Make sure broker ledger cache is always missed
There a blog post by AutoMQ folks about PooledByteBufAllocator memory fragmentation: https://www.automq.com/blog/netty-based-streaming-systems-memory-fragmentation-and-oom-issues . I guess the blog post was written before AdaptiveByteBufAllocator became usable. It would be interesting to also see how AdaptiveByteBufAllocator behaves in Pulsar.
In Pulsar, the broker caching adds more of this fragmentation since it could hold on to much larger buffer when an entry is cached. In Netty, a slice of a buffer will hold the parent buffer in memory until all slices have been released. The mitigation in Pulsar is
managedLedgerCacheCopyEntries=true, but that adds overhead since entries would get copied each time they are added to the cache.
Lari thank you so much for your detailed reply.
I also support adjusting the chunk size to 8MB (-Dio.netty.allocator.maxOrder=10) as a first step. At the very least, for scenarios involving large message payload (e.g., where each message is 4MB or larger), this change would prevent Netty from needing to call the allocateHuge() method to request direct memory from the OS for every single byteBuf allocation. Increasing the maxOrder should provide benefits for both message throughput and memory reuse.
Your later point about adjusting the client-side JVM startup parameters to improve throughput efficiency is also very meaningful. I will try enabling this parameter in some of our high-traffic client clusters to verify if performance improves.
In our scenario, we have also observed the backpressure mentioned in your issue #24926. When a topic has many subscriptions, and each subscription has only one active consumer with a large receiver_queue_size(default: 1000 entries), slow network speeds can easily trigger the channel’s high watermark(c.isWritable() = false). Since no other consumer channels are available for writing within the current subscription, the consumer’s dispatcher will eventually write all entries from the read batch, one by one, into this single channel. This leads to increased direct memory usage. To mitigate this, we have reduced the client-side queue size(to 20) to lower direct memory consumption. However, a long-term solution may require optimizing the trySendMessagesToConsumers logic for high-watermark scenarios.
Due to varying write speeds across different channels, the timing of byteBuf releases also differs. This situation is highly likely to cause memory fragmentation within chunks. In one of my clusters, each broker runs on a 16C32G spec, configured with a 12GB heap and 16GB of direct memory (-Xms12g -Xmx12g -XX:MaxDirectMemorySize=16g). The current Netty configuration uses -Dio.netty.allocator.maxOrder=13, -Dio.netty.allocator.numDirectArenas=8, and -Dio.netty.allocator.maxCachedBufferCapacity=8388608.
During production operation, some brokers maintain consistently high direct memory usage, typically ranging from 8–11GB. However, despite this high usage, after observing for a week, I have not detected a clear upward trend in direct memory consumption, nor have I observed the memory leak issue described in the AutoMQ documentation. And in the long term, it might be valuable enough to conduct a performance comparison or benchmark between the AdaptiveByteBufAllocator and the current PooledByteBufAllocator.
This Pulsar cluster runs only one Pulsar broker per pod. The configured 16GB of direct memory is dedicated exclusively to this single broker. Therefore, my primary concern isn't the reclamation speed of the chunks holding the byteBufs. Instead, I aim to maximize the chunk reuse rate to avoid frequent allocation and deallocation of native memory by Netty from the OS. I probably won't enable the managedLedgerCacheCopyEntries=true configuration currently.
I‘m trying to reproduce the issue, so I'd like to ask what this do you mean,
managedLedgerCacheSizeMB =0?Make sure broker ledger cache is always missed
@Technoboy- Thank you very much for your reply.
I am currently using the default managedLedgerCache settings from broker.conf without any modifications.
To achieve "broker ledger cache is always missed" I think we need to ensure that subscriptions are not all started at once. For example, starting a new subscription every 10 seconds. As long as there has a message backlog in the current subscription, the ledger cache is generally not hit. The relevant configurations used for managedLedgerCacheare as follows:
# Amount of memory to use for caching data payload in managed ledger. This memory
# is allocated from JVM direct memory and it's shared across all the topics
# running in the same broker. By default, uses 1/5th of available direct memory
managedLedgerCacheSizeMB=
# Whether we should make a copy of the entry payloads when inserting in cache
managedLedgerCacheCopyEntries=false
# Threshold to which bring down the cache level when eviction is triggered
managedLedgerCacheEvictionWatermark=0.9
# Configure the cache eviction frequency for the managed ledger cache (evictions/sec)
managedLedgerCacheEvictionFrequency=100.0
# All entries that have stayed in cache for more than the configured time, will be evicted
managedLedgerCacheEvictionTimeThresholdMillis=1000
# Configure the threshold (in number of entries) from where a cursor should be considered 'backlogged'
# and thus should be set as inactive.
managedLedgerCursorBackloggedThreshold=1000
# Rate limit the amount of writes per second generated by consumer acking the messages
managedLedgerDefaultMarkDeleteRateLimit=1.0
I'm not entirely sure whether the current issue of frequent memory allocation and deallocation leading to inconsistent physical memory usage, is related to glibc or the Linux distribution version. However, based on my observations across different clusters, this problem appears more likely to occur with older JDK versions. If you want to reproduce this issue, I would recommend first trying with Broker v2.10 and JDK 8 in your Linux environment. If the problem can be reproduced, you could then upgrade the Broker version to the currently maintained v4.x LTS release for further observation.
Broker Operating system and hardware type: 5.4.241-1-tlinux4 (cloud vendor host Linux distributions, acted like CentOS)
Since this is glibc based, have you considered tuning glibc parameters? #24692 mentions some of the options. One other detail that is relevant in Linux is the Transparent Huge Pages configuration. It's necessary to tune the Linux defaults. Some details in another context (ZGC tuning) at https://github.com/gunnarmorling/discussions.morling.dev/discussions/335#discussioncomment-14441044 .
For oom kills, one reason could be native memory heap fragmentation. Tuning the parameters could help in that. It's usually necessary to set at least /sys/kernel/mm/transparent_hugepage/enable to madvise or add transparent_hugepage=madvise boot parameter to get reasonable settings. In addition to that setting MALLOC_ARENA_MAX=4 is something that reduces heap fragmentation with some tradeoffs.
Many Java bases systems recommend setting MALLOC_ARENA_MAX=4 on glibc.
Broker Operating system and hardware type: 5.4.241-1-tlinux4 (cloud vendor host Linux distributions, acted like CentOS)
Since this is glibc based, have you considered tuning glibc parameters? #24692 mentions some of the options. One other detail that is relevant in Linux is the Transparent Huge Pages configuration. It's necessary to tune the Linux defaults. Some details in another context (ZGC tuning) at gunnarmorling/discussions.morling.dev#335 (comment) .
For oom kills, one reason could be native memory heap fragmentation. Tuning the parameters could help in that. It's usually necessary to set at least
/sys/kernel/mm/transparent_hugepage/enabletomadviseor addtransparent_hugepage=madviseboot parameter to get reasonable settings. In addition to that settingMALLOC_ARENA_MAX=4is something that reduces heap fragmentation with some tradeoffs. Many Java bases systems recommend settingMALLOC_ARENA_MAX=4on glibc.
Thanks Lari. This is a good approach to optimize native memory recycling strategy. Since it involves modifications to Linux kernel parameters, I need to schedule time to implement and validate these changes. If the parameter adjustments improve significantly, I will provide feedback in this issue.
echo madvise | sudo tee /sys/kernel/mm/transparent_hugepage/enabled
echo advise | sudo tee /sys/kernel/mm/transparent_hugepage/shmem_enabled
echo defer | sudo tee /sys/kernel/mm/transparent_hugepage/defrag
echo 1 | sudo tee /sys/kernel/mm/transparent_hugepage/khugepaged/defrag
The current Linux configuration on my broker is as follows:
# cat /sys/kernel/mm/transparent_hugepage/enabled
always [madvise] never
# cat /sys/kernel/mm/transparent_hugepage/shmem_enabled
always within_size advise [never] deny force
# cat /sys/kernel/mm/transparent_hugepage/defrag
always defer defer+madvise [madvise] never
# cat /sys/kernel/mm/transparent_hugepage/khugepaged/defrag
1