[improve][broker] Implementing delayed message cancellation in pulsar
Fixes #xyz
Main Issue: #23149
PIP: https://github.com/apache/pulsar/pull/24370
Motivation
In dynamic event-driven architectures, the relevance and timing of messages are crucial. Apache Pulsar currently supports delayed message delivery, which is beneficial for scheduling future processing. However, once scheduled, there is no native support to modify or cancel these messages based on changing circumstances. This limitation poses challenges in situations where messages become obsolete before their scheduled delivery or when system conditions change, necessitating the early processing or cancellation of delayed messages.
If we want to implement the cancellation of delayed messages at the moment, we can only store the delayed messages to be cancelled in the memory of the consumer side, and if the delay time is too long, there will be many problems.
The idea of this PR is to send a message as a MARK message for canceling the target message. The delay time needs to be one to two windows before the target message, with the window size being delayedDeliveryTickTimeMillis.
After the MARK message arrives, it is stored in a map. When the target message is triggered, it is directly filtered, and finally, both messages will be directly acknowledged.
Modifications
The following changes were introduced in Pulsar’s message dispatching components to enable delayed message cancellation:
- Extended AbstractBaseDispatcher with the ability to recognize and handle cancellation properties in message metadata.
- Added constants for identifying cancellation-specific properties in message metadata.
The key implementation detail involves checking each message for a cancellation marker as it becomes eligible for dispatch. If a message is marked for cancellation, it is dropped from the dispatch queue and its associated resources are cleaned up immediately.
Verifying this change
- [x] Make sure that the change passes the CI checks.
(Please pick either of the following options)
This change is a trivial rework / code cleanup without any test coverage.
(or)
This change is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
- Added integration tests for end-to-end deployment with large payloads (10MB)
- Extended integration test for recovery after broker failure
Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes
- [ ] Dependencies (add or upgrade a dependency)
- [ ] The public API
- [ ] The schema
- [ ] The default values of configurations
- [ ] The threading model
- [ ] The binary protocol
- [ ] The REST endpoints
- [ ] The admin CLI options
- [ ] The metrics
- [ ] Anything that affects deployment
Documentation
- [ ]
doc - [ ]
doc-required - [x]
doc-not-needed - [ ]
doc-complete
Matching PR in forked repository
PR in forked repository:
@Denovo1998 Please add the following content to your PR description and select a checkbox:
- [ ] `doc` <!-- Your PR contains doc changes -->
- [ ] `doc-required` <!-- Your PR changes impact docs and you will update later -->
- [ ] `doc-not-needed` <!-- Your PR changes do not impact docs -->
- [ ] `doc-complete` <!-- Docs have been already added -->
When the cancellation messages are stored in the topic, cancellation would only work when the DelayedDeliveryTracker state is such that it has "indexed" the delayed messages and the cancellation messages. For example, the InMemoryDelayedDeliveryTracker keeps state only in memory. The impact of the cancellation commands in the topic would be that before delivering any scheduled message, the state would have to be caught up before delivering a scheduled message. This is just a first thought about the possible impact of supporting cancellation. Due to such performance impacting details, it's more likely that this type of cancellation support would have to be enabled for a namespace or topic explicitly.
Not directly related, but contains some details about the current delayed delivery tracker solution: #23912 .
For the BucketDelayedDeliveryTracker, it could be feasible to add the cancellation support since it keeps persistent state. However, the challenge that the cancellation commands might not be processed while making delayed message delivery decisions remains. It seems that some sort of RPC would be needed and storing the cancellation commands directly in the BucketDelayedDeliveryTracker persistence solution could be feasible. For the InMemoryDelayedDeliveryTracker, I don't think that it would be feasible to add support for cancellation at all since based on my initial analysis, I don't think that it makes sense to store the cancellation state in the topic itself due to the problems with unprocessed cancellation commands while making delivery decisions.
@lhotari Thank you very much, this is very useful.
I initially implemented delayed message cancellation in the Dispatcher rather than the DelayedDeliveryTracker because using MARK messages appeared straightforward, and I assumed minimal storage overhead since not all delayed messages require cancellation. Determining the optimal timing for sending MARK messages also presented significant design challenges. However, as you rightly pointed out, storing MARK messages directly in topics was fundamentally flawed from the start, and I overlooked geo-replication implications.
Additionally, I realized the current PR's MARK messages affect the entire topic rather than specific subscriptions - cancellation commands should ideally be subscription-scoped. The cancellation command for delayed messages proposed in a subscription should only act on that one subscription.
I will now focus on implementing delayed message cancellation within the BucketDelayedDeliveryTracker and plan to propose a PIP in the near future.
First of all, what do you mean by cancelling a delayed message? do you want to delete such message in broker and don't dispatch it, or cancel the delayed time and dispatch it immediately?
As a message queue, pulsar do not support message modification, but if you do want to withdraw the message produced, you can try the transaction feature or topic compaction feature. Frankly speaking, we may meet some issues when combining delayed message feature with transaction feature or topic compaction feature.
It is also reasonable that pulsar do not support the modification of delayed time or cancellation of delayed message, just like we can't withdraw the message produced. There are too many features in pulsar, we can't integrate all features into pulsar, most of these so-called requirement should be implemented in user's side. And the complexity of Pulsar is pretty high, we need to balance between the benefit of this minority demand and the risk of breaking existing logic, adding the complexity of the project. Combination of different features of pulsar can result into lots of problem. IMO, what pulsar mostly need is not adding edge features that only minority would use, but the reliability, stability, simplicity. We need to decrease the complexity of the project.
This is a big new feature, impacting the core logic, a pip may be needed to get more opinions from the community.
This is a big new feature, impacting the core logic, a pip may be needed to get more opinions from the community.
Yes. It‘s very interesting and challenging for me. I will launch the PIP process later.