pulsar icon indicating copy to clipboard operation
pulsar copied to clipboard

[feat][pip] PIP-426: Enable Consumer Throttling and Accurate Unacknowledged Message Tracking for Exclusive and Failover Subscriptions

Open berg223 opened this issue 7 months ago • 0 comments

Fixes https://github.com/apache/pulsar/issues/24159

Main Issue: https://github.com/apache/pulsar/issues/24159

Motivation

  1. I have tried to fix the issue by https://github.com/apache/pulsar/pull/24376. However, that PR doesn't support features about cumulative ack, batching mode, and transaction. So I want to improve it further.
  2. I have found another issue that flowcontrol of exclusive or failover consumer is not work. Since the issues are highly correlated, I want to fix them at the same time.
We can reproduce flow control issue by this unit test
    @Test(timeOut = 30000)
    public void testMaxUnackedMessagesOnExclusiveConsumer() throws Exception {
        final String topicName = testTopic + System.currentTimeMillis();
        final String subscriberName = "test-sub-exclusive" + System.currentTimeMillis();
        final int unackMsgAllowed = 100;
        final int receiverQueueSize = 10;
        final int totalProducedMsgs = 300;

        ConsumerBuilder<String> consumerBuilder = pulsarClient.newConsumer(Schema.STRING).topic(topicName)
                .subscriptionName(subscriberName).receiverQueueSize(receiverQueueSize)
                .ackTimeout(1, TimeUnit.MINUTES)
                .subscriptionType(SubscriptionType.Exclusive);
        @Cleanup
        Consumer<String> consumer = consumerBuilder.subscribe();
        // 1) Produced Messages
        @Cleanup
        Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topicName).create();
        for (int i = 0; i < totalProducedMsgs; i++) {
            String message = "my-message-" + i;
            producer.send(message);
        }
        // 2) Unlimited, so all messages can be consumed
        int count = 0;
        List<Message<String>> list = new ArrayList<>(totalProducedMsgs);
        for (int i = 0; i < totalProducedMsgs; i++) {
            Message<String> message = consumer.receive(1, TimeUnit.SECONDS);
            if (message == null) {
                break;
            }
            count++;
            list.add(message);
        }
        assertEquals(count, totalProducedMsgs);
        list.forEach(message -> {
            try {
                consumer.acknowledge(message);
            } catch (PulsarClientException e) {
            }
        });
        // 3) Set restrictions, so only part of the data can be consumed
        waitCacheInit(topicName);
        admin.topics().setMaxUnackedMessagesOnConsumer(topicName, unackMsgAllowed);
        Awaitility.await().untilAsserted(()
                -> assertNotNull(admin.topics().getMaxUnackedMessagesOnConsumer(topicName)));
        assertEquals(admin.topics().getMaxUnackedMessagesOnConsumer(topicName).intValue(), unackMsgAllowed);
        // 4) consumer can only consume 100 messages
        for (int i = 0; i < totalProducedMsgs; i++) {
            String message = "my-message-" + i;
            producer.send(message);
        }
        int consumerCounter = 0;
        Message<String> message = null;
        for (int i = 0; i < totalProducedMsgs; i++) {
            try {
                Message<String> msg = consumer.receive(500, TimeUnit.MILLISECONDS);
                if (msg == null) {
                    break;
                }
                message = msg;
                ++consumerCounter;
            } catch (PulsarClientException e) {
                break;
            }
        }
        assertEquals(consumerCounter, unackMsgAllowed);
        consumer.acknowledgeCumulative(message.getMessageId());
        consumerCounter = 0;
        for (int i = 0; i < totalProducedMsgs - unackMsgAllowed; i++) {
            try {
                message = consumer.receive(500, TimeUnit.MILLISECONDS);
                if (message == null) {
                    break;
                }
                ++consumerCounter;
            } catch (PulsarClientException e) {
                break;
            }
        }
        assertEquals(consumerCounter, unackMsgAllowed);
    }

Modifications

  • Reused and extended the pendingAcks mechanism in the Consumer class to support exclusive and failover subscriptions.
  • Removed the restriction that pendingAcks only works in individual ack mode.
  • Reused PendingAckHandleImpl for transaction support after removing the dependency on Subscription.isIndividualAckMode.

Verifying this change

  • [X] Make sure that the change passes the CI checks.

This change is already covered by existing tests, such as (please describe tests).

  • MaxUnackedMessagesTest#testMaxUnackedMessagesOnConsumer

This change added tests and can be verified as follows:

  • MaxUnackedMessagesTest#testMaxUnackedMessagesOnExclusiveConsumer
  • MaxUnackedMessagesTest#testMaxUnackedMessagesOnFailOverConsumer
  • PersistentDispatcherSingleActiveConsumerTest#testUnackedMessages
  • PersistentDispatcherSingleActiveConsumerTest#testUnackedMessagesWithTransaction

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • [ ] Dependencies (add or upgrade a dependency)
  • [ ] The public API
  • [ ] The schema
  • [ ] The default values of configurations
  • [ ] The threading model
  • [ ] The binary protocol
  • [ ] The REST endpoints
  • [ ] The admin CLI options
  • [ ] The metrics
  • [ ] Anything that affects deployment

Documentation

  • [ ] doc
  • [ ] doc-required
  • [X] doc-not-needed
  • [ ] doc-complete

Matching PR in forked repository

PR in forked repository:

berg223 avatar Jun 08 '25 19:06 berg223