[Fix][Broker] Filter Invalid Messages During Individual Acknowledgment
Motivation
When a client sends an individual acknowledgment for a message that does not exist in the pending acknowledgment list (e.g., due to deletion or expiration), the method getAckOwnerConsumerAndBatchSize incorrectly returns a non-null value. This causes downstream logic to process the acknowledgment as if the message were valid, leading to unnecessary operations and potential inconsistencies.
This PR ensures that invalid message acknowledgments are filtered out early, preventing unnecessary processing and improving system robustness.
Modifications
1.Updated getAckOwnerConsumerAndBatchSize: Now returns null when the specified ledger/entry ID is not found in the pending acknowledgment list.
2.Enhanced acknowledgment handling: Added null-checks in individualAckNormal and individualAckWithTransaction to skip invalid message IDs and log warnings.
3.Added validation for empty acknowledgment lists: If no valid messages remain after filtering, the acknowledgment process completes early with a logged warning.
4.Refined documentation: Updated the JavaDoc for removePendingAcks to accurately reflect the current logic introduced in a previous optimization https://github.com/apache/pulsar/pull/23072
Verifying this change
- [x] Make sure that the change passes the CI checks.
Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes
- [ ] Dependencies (add or upgrade a dependency)
- [ ] The public API
- [ ] The schema
- [ ] The default values of configurations
- [ ] The threading model
- [ ] The binary protocol
- [ ] The REST endpoints
- [ ] The admin CLI options
- [ ] The metrics
- [ ] Anything that affects deployment
Documentation
- [ ]
doc - [ ]
doc-required - [x]
doc-not-needed - [ ]
doc-complete
Matching PR in forked repository
PR in forked repository: https://github.com/3pacccccc/pulsar/pull/33