[improve][pip] PIP-423: Add a new admin API to acknowledge a single message
Motivation
Operators and SREs occasionally need to intervene in a topic's backlog to handle problematic messages or adapt to changing business requirements. For instance:
- Cancelling Scheduled Actions: A delayed message representing a future task (e.g., a scheduled report or a notification) may become obsolete. The most efficient way to handle this is to prevent its delivery entirely.
- Removing Backlogs: A specific message in a backlog might have a malformed payload that causes consumer applications to crash repeatedly. Removing this single message without affecting valid messages around it is a critical operational capability.
- Manual Business Logic Correction: An event may have been sent that is later determined to be invalid due to external factors. An administrator needs a precise tool to remove this specific event from a subscription's queue.
The existing skip-messages API is a blunt instrument, ill-suited for these precise, targeted operations. This proposal introduces a more fine-grained administrative tool to acknowledge a single message by its unique MessageId. This provides a robust and reliable way to remove any individual message—delayed or not—from a subscription's backlog.
Implementation PR: https://github.com/apache/pulsar/pull/23907
Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes
- [ ] Dependencies (add or upgrade a dependency)
- [x] The public API
- [ ] The schema
- [ ] The default values of configurations
- [ ] The threading model
- [ ] The binary protocol
- [ ] The REST endpoints
- [ ] The admin CLI options
- [ ] The metrics
- [ ] Anything that affects deployment
Documentation
- [ ]
doc - [ ]
doc-required - [x]
doc-not-needed - [ ]
doc-complete
Matching PR in forked repository
PR in forked repository: https://github.com/Denovo1998/pulsar/pull/8
Currently, there are some issues with this implementation. (Note that this PIP only discusses the implementation of BucketDelayedDeliveryTracker)
If a delayed message in the LastMutableBucket has not been flushed to Bookie yet, and a failure occurs, the data in the LastMutableBucket will be lost. However, this will not have any impact, as after restarting, messages will still be read from the MackDelete position and the Bucket will be rebuilt.
However, our delayed message deletion command is different. If there is a LastMutableBucket that has not been successfully persisted (sealBucketAndAsyncPersistent), and the Broker crashes, this command will be lost. We cannot wait for the Seal Bucket condition to be triggered before returning the success of canceling the delayed message command, because we do not know how long it will take.
This part is important and needs to be resolved. I will think about how to solve this part, and everyone is also welcome to discuss with me.
I have some solutions, let me try testing them out first.
The document has been updated, we will implement the delayed message cancellation function through acknowledge message.