[websocket] Replace blocking logic to avoid OOM
Motivation
I encountered an OOM issue in pulsar-websocket because users do negative ack very frequently for mistaken.
For the heap dump, many org.apache.pulsar.websocket.ConsumerHandler$1 are referenced from ConsumerHandler.
https://github.com/apache/pulsar/blob/acac76e9799bc31e388c593aa553711c6ad734aa/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java#L169-L187
Modifications
I added the new configuration param webSocketMaxOutgoingFrames to use jetty's MaxOutgoingFrames feature.
https://github.com/eclipse/jetty.project/commit/71df3b57eef08040203337f6d83ff28adc86d7c7
We can avoid increasing pending outgoing frames.
If the outgoingFrames is reached to the max and tries to send, jetty calls writeFailed of new sending callback.
However, ConsumerHandler$1#writeFailed calls ScheduledExecutorService#execute without any limitation. Thus, this approach alone can't decrease the number of ConsumerHandler$1 instances. https://github.com/apache/pulsar/blob/acac76e9799bc31e388c593aa553711c6ad734aa/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java#L176
I replaced pendingMessages feature from counting a number of requests to counting actual message id. (It is one of breaking changes.) Because ConsumerHandler decrements pendingMessages even if user requests negative ack by same message id. Also, I replaced ConsumerHandler$1#writeFailed to call ScheduledExecutorService#execute when no more ConsumerHandler#receiveMessage is executed.
Verifying this change
- [x] Make sure that the change passes the CI checks.
This change added tests and can be verified as follows:
- Added unit tests for ConsumerHandler
Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (yes)
- in test scope
- The public API: (no)
- The schema: (no)
- The default values of configurations: (yes)
- The wire protocol: (no)
- The rest endpoints: (no)
- The admin cli options: (no)
- Anything that affects deployment: (no)
Documentation
Check the box below or label this PR directly (if you have committer privilege).
Need to update docs?
- [x]
doc
The pr had no activity for 30 days, mark with Stale label.