pulsar icon indicating copy to clipboard operation
pulsar copied to clipboard

[fix][broker][branch-3.1] Avoid PublishRateLimiter use an already closed RateLimiter

Open coderzc opened this issue 1 year ago • 8 comments
trafficstars

Motivation

We found the following error logs on the broker when we used ResourceGroupPublishLimiter. This root cause is tryAcquire method has a race condition with the replaceLimiters method, leading to publishRateLimiter using an already closed RateLimiter. PrecisePublishLimiter also has the same issue.

[pulsar-io-4-54] WARN  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.6:59173] Got exception java.lang.IllegalArgumentException: Rate limiter is already shutdown
 at com.google.common.base.Preconditions.checkArgument(Preconditions.java:145)
 at org.apache.pulsar.common.util.RateLimiter.tryAcquire(RateLimiter.java:176)
 at org.apache.pulsar.broker.resourcegroup.ResourceGroupPublishLimiter.tryAcquire(ResourceGroupPublishLimiter.java:138)
 at org.apache.pulsar.broker.service.AbstractTopic.isResourceGroupPublishRateExceeded(AbstractTopic.java:1036)
 at org.apache.pulsar.broker.service.ServerCnx.startSendOperation(ServerCnx.java:2611)
 at org.apache.pulsar.broker.service.ServerCnx.handleSend(ServerCnx.java:1508)
 at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:207)
 at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
 at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
 at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
 at io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:200)
 at io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:162)
 at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
 at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
 at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
 at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
 at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:333)
 at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:454)
 at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)
 at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
 at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
 at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
 at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
 at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
 at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
 at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
 at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:800)
 at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:499)
 at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:397)
 at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
 at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
 at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
 at java.base/java.lang.Thread.run(Thread.java:829)

Modifications

If the current RateLimiter is already shutdown, we only return true and print the info log. Due to pip-322 refactor pulsar rate limiting on version 3.2, so we only need to fix versions before 3.2.

Verifying this change

  • [x] Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • [ ] Dependencies (add or upgrade a dependency)
  • [ ] The public API
  • [ ] The schema
  • [ ] The default values of configurations
  • [ ] The threading model
  • [ ] The binary protocol
  • [ ] The REST endpoints
  • [ ] The admin CLI options
  • [ ] The metrics
  • [ ] Anything that affects deployment

Documentation

  • [ ] doc
  • [ ] doc-required
  • [x] doc-not-needed
  • [ ] doc-complete

Matching PR in forked repository

PR in forked repository:

coderzc avatar Feb 01 '24 02:02 coderzc

Thanks @coderzc - does this mean that if we're getting this error that rate limiting is or is not working? Are producers impacted by this successfully producing with or without the rate limit? please advise.

frankjkelly avatar Feb 01 '24 15:02 frankjkelly

Thanks @coderzc - does this mean that if we're getting this error that rate limiting is or is not working? Are producers impacted by this successfully producing with or without the rate limit? please advise.

@frankjkelly When we get this error log, then send operation failed temporarily due to timeout. At this time, the rate limiter is being updated. When it is updated, it will continue to work normally.

coderzc avatar Feb 05 '24 11:02 coderzc

@coderzc A better fix is to remove the exception that is thrown when the rate limiter is closed. Simply return if it is closed. This could be logged at info level.

https://github.com/apache/pulsar/blob/82237d3684fe506bcb6426b3b23f413422e6e4fb/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java#L178

@lhotari Good idea, If the current RateLimiter is already shutdown, then we only return true and print the info log. PTAL~

coderzc avatar Feb 05 '24 12:02 coderzc

Thanks @coderzc - does this mean that if we're getting this error that rate limiting is or is not working? Are producers impacted by this successfully producing with or without the rate limit? please advise.

@frankjkelly When we get this error log, then send operation failed temporarily due to timeout. At this time, the rate limiter is being updated. When it is updated, it will continue to work normally.

Thanks for the clarification so does that mean the client will retry and if so is that within milliseconds, or seconds or something else?

frankjkelly avatar Feb 05 '24 14:02 frankjkelly

Thanks for the clarification so does that mean the client will retry and if so is that within milliseconds, or seconds or something else?

@frankjkelly I think the client does not retry automatically and the user needs to resend the message manually if message sent fails.

coderzc avatar Feb 09 '24 10:02 coderzc

Thanks for the clarification so does that mean the client will retry and if so is that within milliseconds, or seconds or something else?

@frankjkelly I think the client does not retry automatically and the user needs to resend the message manually if message sent fails.

Hmmm @merlimat or @lhotari can you confirm? If this error requires the caller to catch and retry (as opposed to the client doing it internally) then that's a concern for adoption of the rate limiter (if the error occurs and the client retries as best it can that's OK).

frankjkelly avatar Feb 09 '24 13:02 frankjkelly

Thanks for the clarification so does that mean the client will retry and if so is that within milliseconds, or seconds or something else?

@frankjkelly I think the client does not retry automatically and the user needs to resend the message manually if message sent fails.

Hmmm @merlimat or @lhotari can you confirm? If this error requires the caller to catch and retry (as opposed to the client doing it internally) then that's a concern for adoption of the rate limiter (if the error occurs and the client retries as best it can that's OK).

I don't see anything special about rate limiters in message delivery and retries. The Pulsar client is designed to continue attempting to send messages until a potential send timeout occurs. It's also possible to set up an unlimited send timeout, allowing the client to retry indefinitely. This feature is detailed in the Pulsar documentation, available at https://pulsar.apache.org/docs/3.1.x/cookbooks-deduplication/#pulsar-clients (it's explained in the context of message deduplication). You can refer to the Javadocs for sendTimeout on ProducerBuilder.

It's crucial for messaging applications to be equipped to handle potential failures in message delivery, especially when data consistency is a key concern. Once the Pulsar client has acknowledged the message as sent by returning the message id, the responsibility for maintaining and ensuring the delivery of the message shifts to Pulsar. It's also necessary to verify that the message id is returned when using the asynchronous API (sendAsync).

If sending results in an error or the messaging application never receives a message id from the Pulsar client, it's the messaging application's responsibility to retry.

@frankjkelly, did I answer your question?

lhotari avatar Feb 09 '24 16:02 lhotari

Thanks for the clarification so does that mean the client will retry and if so is that within milliseconds, or seconds or something else?

@frankjkelly I think the client does not retry automatically and the user needs to resend the message manually if message sent fails.

Hmmm @merlimat or @lhotari can you confirm? If this error requires the caller to catch and retry (as opposed to the client doing it internally) then that's a concern for adoption of the rate limiter (if the error occurs and the client retries as best it can that's OK).

I don't see anything special about rate limiters in message delivery and retries. The Pulsar client is designed to continue attempting to send messages until a potential send timeout occurs. It's also possible to set up an unlimited send timeout, allowing the client to retry indefinitely. This feature is detailed in the Pulsar documentation, available at https://pulsar.apache.org/docs/3.1.x/cookbooks-deduplication/#pulsar-clients (it's explained in the context of message deduplication). You can refer to the Javadocs for sendTimeout on ProducerBuilder.

It's crucial for messaging applications to be equipped to handle potential failures in message delivery, especially when data consistency is a key concern. Once the Pulsar client has acknowledged the message as sent by returning the message id, the responsibility for maintaining and ensuring the delivery of the message shifts to Pulsar. It's also necessary to verify that the message id is returned when using the asynchronous API (sendAsync).

If sending results in an error or the messaging application never receives a message id from the Pulsar client, it's the messaging application's responsibility to retry.

@frankjkelly, did I answer your question?

You did - Thanks @lhotari

frankjkelly avatar Feb 09 '24 20:02 frankjkelly