pulsar icon indicating copy to clipboard operation
pulsar copied to clipboard

[Bug] unacked message count is zero when using exclusive subscription.

Open nodece opened this issue 8 months ago • 3 comments

Search before asking

  • [x] I searched in the issues and found nothing similar.

Read release policy

  • [x] I understand that unsupported versions don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker.

Version

Pulsar: master

Minimal reproduce step

@Test
    public void testUnackedMessages() throws Exception {
        String topicName = "testUnackedMessages";
        String subscriptionName = "my-sub";
        String serviceUrl = "pulsar://localhost:6650";

        @Cleanup PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(serviceUrl).build();

        @Cleanup Consumer<byte[]> consumer =
                pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
                        .subscriptionType(SubscriptionType.Exclusive).messageListener((c, message) -> {
                            // noop
                        }).subscribe();

        @Cleanup Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
        producer.send("1".getBytes());

        TopicStats stats = admin.topics().getStats(topicName);
        assertThat(stats.getSubscriptions().get(subscriptionName).getUnackedMessages()).isNotEqualTo(0);
    }

What did you expect to see?

unackedMessages is not equal to 0.

What did you see instead?

unackedMessages is equal to 0.

Anything else?

No response

Are you willing to submit a PR?

  • [x] I'm willing to submit a PR!

nodece avatar Apr 08 '25 09:04 nodece

It seems that exclusive subscriptions lack support for counting unacknowledged messages.

 if (Subscription.isIndividualAckMode(subType)) {
            if (dispatcher instanceof AbstractPersistentDispatcherMultipleConsumers) {
                AbstractPersistentDispatcherMultipleConsumers d =
                        (AbstractPersistentDispatcherMultipleConsumers) dispatcher;
                subStats.unackedMessages = d.getTotalUnackedMessages();
   ....
            }
        }

https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java#L1332

    static boolean isIndividualAckMode(SubType subType) {
        return SubType.Shared.equals(subType) || SubType.Key_Shared.equals(subType);
    }

https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java#L134 #

fretory avatar Apr 14 '25 03:04 fretory

You are right. By the way, we should also implement org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer#addUnAckedMessages.

nodece avatar Apr 14 '25 04:04 nodece

I have submit a PR #24376 to fix it

berg223 avatar Jun 03 '25 14:06 berg223