pulsar
pulsar copied to clipboard
[Bug] unacked message count is zero when using exclusive subscription.
Search before asking
- [x] I searched in the issues and found nothing similar.
Read release policy
- [x] I understand that unsupported versions don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker.
Version
Pulsar: master
Minimal reproduce step
@Test
public void testUnackedMessages() throws Exception {
String topicName = "testUnackedMessages";
String subscriptionName = "my-sub";
String serviceUrl = "pulsar://localhost:6650";
@Cleanup PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(serviceUrl).build();
@Cleanup Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
.subscriptionType(SubscriptionType.Exclusive).messageListener((c, message) -> {
// noop
}).subscribe();
@Cleanup Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
producer.send("1".getBytes());
TopicStats stats = admin.topics().getStats(topicName);
assertThat(stats.getSubscriptions().get(subscriptionName).getUnackedMessages()).isNotEqualTo(0);
}
What did you expect to see?
unackedMessages is not equal to 0.
What did you see instead?
unackedMessages is equal to 0.
Anything else?
No response
Are you willing to submit a PR?
- [x] I'm willing to submit a PR!
It seems that exclusive subscriptions lack support for counting unacknowledged messages.
if (Subscription.isIndividualAckMode(subType)) {
if (dispatcher instanceof AbstractPersistentDispatcherMultipleConsumers) {
AbstractPersistentDispatcherMultipleConsumers d =
(AbstractPersistentDispatcherMultipleConsumers) dispatcher;
subStats.unackedMessages = d.getTotalUnackedMessages();
....
}
}
static boolean isIndividualAckMode(SubType subType) {
return SubType.Shared.equals(subType) || SubType.Key_Shared.equals(subType);
}
You are right. By the way, we should also implement org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer#addUnAckedMessages.
I have submit a PR #24376 to fix it