pulsar
pulsar copied to clipboard
[fix][broker][branch-3.1] Avoid PublishRateLimiter use an already closed RateLimiter
Motivation
We found the following error logs on the broker when we used ResourceGroupPublishLimiter. This root cause is tryAcquire method has a race condition with the replaceLimiters method, leading to publishRateLimiter using an already closed RateLimiter. PrecisePublishLimiter also has the same issue.
[pulsar-io-4-54] WARN org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.6:59173] Got exception java.lang.IllegalArgumentException: Rate limiter is already shutdown
at com.google.common.base.Preconditions.checkArgument(Preconditions.java:145)
at org.apache.pulsar.common.util.RateLimiter.tryAcquire(RateLimiter.java:176)
at org.apache.pulsar.broker.resourcegroup.ResourceGroupPublishLimiter.tryAcquire(ResourceGroupPublishLimiter.java:138)
at org.apache.pulsar.broker.service.AbstractTopic.isResourceGroupPublishRateExceeded(AbstractTopic.java:1036)
at org.apache.pulsar.broker.service.ServerCnx.startSendOperation(ServerCnx.java:2611)
at org.apache.pulsar.broker.service.ServerCnx.handleSend(ServerCnx.java:1508)
at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:207)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:200)
at io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:162)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:333)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:454)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:800)
at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:499)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:397)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:829)
Modifications
If the current RateLimiter is already shutdown, we only return true and print the info log. Due to pip-322 refactor pulsar rate limiting on version 3.2, so we only need to fix versions before 3.2.
Verifying this change
- [x] Make sure that the change passes the CI checks.
(Please pick either of the following options)
This change is a trivial rework / code cleanup without any test coverage.
(or)
This change is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
- Added integration tests for end-to-end deployment with large payloads (10MB)
- Extended integration test for recovery after broker failure
Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes
- [ ] Dependencies (add or upgrade a dependency)
- [ ] The public API
- [ ] The schema
- [ ] The default values of configurations
- [ ] The threading model
- [ ] The binary protocol
- [ ] The REST endpoints
- [ ] The admin CLI options
- [ ] The metrics
- [ ] Anything that affects deployment
Documentation
- [ ]
doc - [ ]
doc-required - [x]
doc-not-needed - [ ]
doc-complete
Matching PR in forked repository
PR in forked repository:
Thanks @coderzc - does this mean that if we're getting this error that rate limiting is or is not working? Are producers impacted by this successfully producing with or without the rate limit? please advise.
Thanks @coderzc - does this mean that if we're getting this error that rate limiting is or is not working? Are producers impacted by this successfully producing with or without the rate limit? please advise.
@frankjkelly When we get this error log, then send operation failed temporarily due to timeout. At this time, the rate limiter is being updated. When it is updated, it will continue to work normally.
@coderzc A better fix is to remove the exception that is thrown when the rate limiter is closed. Simply return if it is closed. This could be logged at info level.
https://github.com/apache/pulsar/blob/82237d3684fe506bcb6426b3b23f413422e6e4fb/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java#L178
@lhotari Good idea, If the current RateLimiter is already shutdown, then we only return true and print the info log. PTAL~
Thanks @coderzc - does this mean that if we're getting this error that rate limiting is or is not working? Are producers impacted by this successfully producing with or without the rate limit? please advise.
@frankjkelly When we get this error log, then send operation failed temporarily due to timeout. At this time, the rate limiter is being updated. When it is updated, it will continue to work normally.
Thanks for the clarification so does that mean the client will retry and if so is that within milliseconds, or seconds or something else?
Thanks for the clarification so does that mean the client will retry and if so is that within milliseconds, or seconds or something else?
@frankjkelly I think the client does not retry automatically and the user needs to resend the message manually if message sent fails.
Thanks for the clarification so does that mean the client will retry and if so is that within milliseconds, or seconds or something else?
@frankjkelly I think the client does not retry automatically and the user needs to resend the message manually if message sent fails.
Hmmm @merlimat or @lhotari can you confirm? If this error requires the caller to catch and retry (as opposed to the client doing it internally) then that's a concern for adoption of the rate limiter (if the error occurs and the client retries as best it can that's OK).
Thanks for the clarification so does that mean the client will retry and if so is that within milliseconds, or seconds or something else?
@frankjkelly I think the client does not retry automatically and the user needs to resend the message manually if message sent fails.
Hmmm @merlimat or @lhotari can you confirm? If this error requires the caller to catch and retry (as opposed to the client doing it internally) then that's a concern for adoption of the rate limiter (if the error occurs and the client retries as best it can that's OK).
I don't see anything special about rate limiters in message delivery and retries. The Pulsar client is designed to continue attempting to send messages until a potential send timeout occurs. It's also possible to set up an unlimited send timeout, allowing the client to retry indefinitely. This feature is detailed in the Pulsar documentation, available at https://pulsar.apache.org/docs/3.1.x/cookbooks-deduplication/#pulsar-clients (it's explained in the context of message deduplication). You can refer to the Javadocs for sendTimeout on ProducerBuilder.
It's crucial for messaging applications to be equipped to handle potential failures in message delivery, especially when data consistency is a key concern. Once the Pulsar client has acknowledged the message as sent by returning the message id, the responsibility for maintaining and ensuring the delivery of the message shifts to Pulsar. It's also necessary to verify that the message id is returned when using the asynchronous API (sendAsync).
If sending results in an error or the messaging application never receives a message id from the Pulsar client, it's the messaging application's responsibility to retry.
@frankjkelly, did I answer your question?
Thanks for the clarification so does that mean the client will retry and if so is that within milliseconds, or seconds or something else?
@frankjkelly I think the client does not retry automatically and the user needs to resend the message manually if message sent fails.
Hmmm @merlimat or @lhotari can you confirm? If this error requires the caller to catch and retry (as opposed to the client doing it internally) then that's a concern for adoption of the rate limiter (if the error occurs and the client retries as best it can that's OK).
I don't see anything special about rate limiters in message delivery and retries. The Pulsar client is designed to continue attempting to send messages until a potential send timeout occurs. It's also possible to set up an unlimited send timeout, allowing the client to retry indefinitely. This feature is detailed in the Pulsar documentation, available at https://pulsar.apache.org/docs/3.1.x/cookbooks-deduplication/#pulsar-clients (it's explained in the context of message deduplication). You can refer to the Javadocs for sendTimeout on ProducerBuilder.
It's crucial for messaging applications to be equipped to handle potential failures in message delivery, especially when data consistency is a key concern. Once the Pulsar client has acknowledged the message as sent by returning the message id, the responsibility for maintaining and ensuring the delivery of the message shifts to Pulsar. It's also necessary to verify that the message id is returned when using the asynchronous API (sendAsync).
If sending results in an error or the messaging application never receives a message id from the Pulsar client, it's the messaging application's responsibility to retry.
@frankjkelly, did I answer your question?
You did - Thanks @lhotari