[fix][broker] fix broker unackmessages number reduce error
issue: reduce the consumer unackedCount use incorrect consumer.
reproduce step
for (int i = 0; i < 5; i++) {
producer.newMessage().value(("Hello Pulsar - " + i).getBytes()).sendAsync();
}
// consume-1 receive 5 batch messages
List<MessageId> list = new ArrayList<>();
for (int i = 0; i < 5; i++) {
list.add(consumer1.receive().getMessageId());
}
// consumer-1 redeliver the batch messages
consumer1.negativeAcknowledge(list.get(0));
// consumer-2 will receive the messages that the consumer-1 redelivered
for (int i = 0; i < 5; i++) {
consumer2.receive().getMessageId();
}
// consumer1 ack two messages in the batch message
consumer1.acknowledge(list.get(1));
consumer1.acknowledge(list.get(2));
// consumer-2 redeliver the rest of the messages
consumer2.negativeAcknowledge(list.get(1));
// consume-1 close will redeliver the rest messages to consumer-2
consumer1.close();
// consumer-2 can receive the rest of 3 messages
for (int i = 0; i < 3; i++) {
consumer2.acknowledge(consumer2.receive().getMessageId());
}
// consumer-2 can't receive any messages, all the messages in batch has been acked
Message<byte[]> message = consumer2.receive(1, TimeUnit.SECONDS);
assertNull(message);
// the number of consumer-2's unacked messages is 0
Awaitility.await().until(() -> getPulsar().getBrokerService().getTopic(topicName, false)
.get().get().getSubscription(subName).getConsumers().get(0).getUnackedMessages() == 0);
in current code the getUnackedMessages = 2
Motivation
get the correct consumer and reduce the correct un acked messages
Modifications
change the method getAckedCountForBatchIndexLevelEnabled use ownerConsumer to check the pendingAck messages
Verifying this change
add the test
Does this pull request potentially affect one of the following parts:
If yes was chosen, please highlight the changes
- Dependencies (does it add or upgrade a dependency): (no)
- The public API: (no)
- The schema: (no)
- The default values of configurations: (no)
- The wire protocol: (no)
- The rest endpoints: (no)
- The admin cli options: (no)
- Anything that affects deployment: (no)
Documentation
- Does this pull request introduce a new feature? (yes)
- If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
- If a feature is not applicable for documentation, explain why?
- If a feature is not documented yet in this PR, please create a followup issue for adding the documentation
@congbobo184 Would it be possible to describe the impact of the bug that was fixed by this PR? The current description describes implementation level details of the issue. However, most Pulsar users don't understand the implementation level, and therefore it would be useful if the externally observable behavior could be explained. What externally observable bug was fixed by this PR?
Hi, @congbobo184 Would you like to help cherry-pick this PR to branch-2.9?
@congbobo184 Would it be possible to describe the impact of the bug that was fixed by this PR? The current description describes implementation level details of the issue. However, most Pulsar users don't understand the implementation level, and therefore it would be useful if the externally observable behavior could be explained. What externally observable bug was fixed by this PR?
@lhotari This PR fixes some cases where consumer unackedCount may become a negative number.