[improve][broker] Support consumer side delayed messages
Fixes #xyz
Main Issue: #xyz
PIP: #xyz
Motivation
When consumers encounter transient failures while processing messages, a common requirement is to retry processing after a certain delay. Before this feature, developers typically resorted to:
- Using Retry/Dead Letter Topics (DLQ): Consumers would republish failed messages to a dedicated retry topic. This approach suffered from write amplification (message produced again) and read amplification (additional consumption from the retry topic), and increased architectural complexity.
- Application-Level Custom Delay Logic: Implementing delay logic using external components (e.g., databases, scheduling frameworks) significantly increased system complexity and external dependencies.
These existing solutions introduced varying degrees of overhead, complexity, or inflexibility. This change aims to provide a native, efficient mechanism for consumer-side delayed message redelivery.
Modifications
This change introduces the capability for Pulsar consumers to negatively acknowledge (Nack) messages with a specified custom delay. The key modifications include:
-
Client API Extension:
- New methods
negativeAcknowledge(MessageId messageId, long delay, TimeUnit unit)andnegativeAcknowledge(Message<?> message, long delay, TimeUnit unit)have been added to theorg.apache.pulsar.client.api.Consumerinterface.
- New methods
-
Protocol Enhancement:
- The
CommandRedeliverUnacknowledgedMessagesprotobuf command has been augmented with an optionaldelay_at_time(uint64) field. This field carries the absolute timestamp at which the message is expected to be redelivered.
- The
-
Broker-Side Core Logic Adjustments:
-
ServerCnx: Updated to recognize and process thedelay_at_timefield inCommandRedeliverUnacknowledgedMessages. -
Consumer/Subscription/Dispatcher:- Relevant interfaces and implementations have been extended to handle redelivery requests with a
delayAtTime. -
PersistentDispatcherMultipleConsumers(and its classic variant) now leverage the topic-levelDelayedDeliveryTrackerto manage Nacked messages with a specified delay. The message's position and the targetdelayAtTimeare added to the tracker, which triggers redelivery when the specified time is reached.
- Relevant interfaces and implementations have been extended to handle redelivery requests with a
- This feature is primarily effective for
SharedandKey_Sharedsubscription types to provide precise delay semantics.
-
With these modifications, consumers can directly request the broker to redeliver a message after a specific delay, leading to:
- Elimination of write and read amplification: No need to republish messages to other topics.
- Simplified application architecture: Removes the need for external delay mechanisms or complex application-level logic.
- Fine-grained control: Allows consumers to dynamically set retry delays per message based on specific conditions.
- Enhanced resource efficiency: Leverages the broker's internal delayed message delivery mechanism.
Verifying this change
- [x] Make sure that the change passes the CI checks.
(Please pick either of the following options)
This change is a trivial rework / code cleanup without any test coverage.
(or)
This change is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
- Added integration tests for end-to-end deployment with large payloads (10MB)
- Extended integration test for recovery after broker failure
Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes
- [ ] Dependencies (add or upgrade a dependency)
- [x] The public API
- [ ] The schema
- [ ] The default values of configurations
- [ ] The threading model
- [ ] The binary protocol
- [ ] The REST endpoints
- [ ] The admin CLI options
- [ ] The metrics
- [ ] Anything that affects deployment
Documentation
- [ ]
doc - [x]
doc-required - [ ]
doc-not-needed - [ ]
doc-complete
Matching PR in forked repository
PR in forked repository:
Currently, there seem to be some issues with this implementation. (Note, this PIP only discusses the implementation of BucketDelayedDeliveryTracker)
If a delayed message in the LastMutableBucket has not been flushed to the Bookie, and a failure occurs, the data in the LastMutableBucket will be lost. However, this will not have any impact, as after restarting, the messages will still be read from the MackDelete position onwards, and the Bucket will be rebuilt. This is why the data in the Bucket can be deleted as long as it is read (without requiring client Ack).
If we send a command to add a delayed message from the consumer side, and it fails to be persisted (sealBucketAndAsyncPersistent) in the LastMutableBucket, and the Broker crashes, the command will be lost. We cannot wait until the Seal Bucket condition is triggered to return the delayed message command cancellation success, because we do not know how long it will take.
This part is very important and needs to be resolved. I will think about how to solve this part, and everyone is welcome to discuss it with me.