[Bug][client] Delay messages can be ack before reaching the delivery time
Search before asking
- [X] I searched in the issues and found nothing similar.
Read release policy
- [X] I understand that unsupported versions don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker.
Version
master
Minimal reproduce step
I found this when trying to implement the abort function of delayed messages. Here is the test code.
@Test(timeOut = 20000)
public void testAbortDelayedMessage() throws Exception {
final String topic = "persistent://my-property/my-ns/testAbortDelayedMessage";
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic)
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.subscribe();
Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).enableBatching(false).create();
admin.topicPolicies().setDelayedDeliveryPolicy(topic,
DelayedDeliveryPolicies.builder()
.active(true)
.tickTime(100L)
.maxDeliveryDelayInMillis(5000)
.build());
MessageId msgId = producer.newMessage()
.value("Test delayed message".getBytes())
.deliverAfter(5, TimeUnit.SECONDS)
.send();
log.warn("Message sent with ID: " + msgId);
producer.flush();
consumer.acknowledge(msgId);
while (true) {
Message<byte[]> msg = consumer.receive(100, TimeUnit.MILLISECONDS);
if (msg != null) {
log.warn("Received message: " + new String(msg.getData()));
consumer.acknowledge(msg.getMessageId());
}
log.warn("Waiting for messages...");
}
}
What did you expect to see?
Delayed messages are consumed normally. Because consumer have not received this message.
What did you see instead?
After the delay message corresponding to the active ack is consumer, the delay message is no longer consumed.
Anything else?
Will there be any problem with ack like this?
Are you willing to submit a PR?
- [ ] I'm willing to submit a PR!
firstly, I dont think it's a bug. This is an incorrect usage of the messaging system. A message should be acked after consumed, not before.
and then, if we fix the "issue", it would come with extra costs: before the broker ack a message, we have to read the message from BK decode the message header and determine whether it is a delayed message and reaches the delivery time. I don't think it is acceptable.
@dao-jun @coderzc @mattisonchao
If this is not a bug, and there is no problem using it this way. Then the delay time change of the delay message can be easily implemented!
@HarryFQG https://github.com/apache/pulsar/discussions/20255 @liuhuagui https://github.com/apache/pulsar/discussions/18317 @hoswey https://github.com/apache/pulsar/issues/10042 @jaggerwang https://github.com/apache/pulsar/issues/21173
This is a simple code example. It is no longer necessary for redis or database to store messages that need to change the delay time, and then filter them on the consumer side.
@Test
public void testDelayedMessageDelayTimeChange() throws Exception {
final String topic = "persistent://my-property/my-ns/testDelayedMessageDelayTimeChange";
final String TASK_ID_OF_DELAY_TIME_CHANGE = "__DEPRECATED_TASK_ID";
final String DELAY_CHANGE_DEPRECATED_MESSAGE_ID = "__DELAY_CHANGE_DEPRECATED_MESSAGE_ID";
final String DELAY_CHANGE_NEW_MESSAGE_ID = "__DELAY_CHANGE_NEW_MESSAGE_ID";
Map<String, MessageId> receivedMessageIdMap = new ConcurrentHashMap<>();
Map<String, MessageId> sendMessageMap = new HashMap<>();
// ============================================================================================================
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic)
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.messageListener((MessageListener<byte[]>) (c, receivedMessage) -> {
MessageId newTaskMessageId = receivedMessage.getMessageId();
String taskId = receivedMessage.getKey();
if (receivedMessage.hasProperty(DELAY_CHANGE_DEPRECATED_MESSAGE_ID)) {
String deprecatedMessage = receivedMessage.getProperty(DELAY_CHANGE_DEPRECATED_MESSAGE_ID);
byte[] bytes = deprecatedMessage.getBytes(UTF_8);
try {
if (receivedMessage.hasProperty(DELAY_CHANGE_NEW_MESSAGE_ID)) {
newTaskMessageId = MessageId.fromByteArray(receivedMessage.
getProperty(DELAY_CHANGE_NEW_MESSAGE_ID).getBytes(UTF_8));
MessageId deprecatedMessageId = MessageId.fromByteArray(bytes);
log.warn("Received deprecated message with taskId {} , oldMessageId {} , newMessageId {}",
receivedMessage.getProperty(TASK_ID_OF_DELAY_TIME_CHANGE),
deprecatedMessageId, newTaskMessageId);
c.acknowledge(deprecatedMessageId);
} else {
MessageId deprecatedMessageId = MessageId.fromByteArray(bytes);
log.warn("Received deprecated message with taskId {} , oldMessageId {} , newMessageId {}",
receivedMessage.getProperty(TASK_ID_OF_DELAY_TIME_CHANGE),
deprecatedMessageId, newTaskMessageId);
c.acknowledge(deprecatedMessageId);
receivedMessageIdMap.putIfAbsent(taskId, newTaskMessageId);
c.acknowledge(newTaskMessageId);
}
} catch (IOException e) {
log.error("Format error. " + e.getCause());
}
} else {
receivedMessageIdMap.putIfAbsent(taskId, newTaskMessageId);
c.acknowledgeAsync(newTaskMessageId);
}
})
.subscribe();
admin.topicPolicies().setDelayedDeliveryPolicy(topic,
DelayedDeliveryPolicies.builder()
.active(true)
.tickTime(100L)
.maxDeliveryDelayInMillis(5000)
.build());
// ============================================================================================================
Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).enableBatching(false).create();
final int taskNum = 10;
for (int i = 0; i < taskNum; i++) {
final String taskId = "task" + i;
producer.newMessage()
.key(taskId)
.value(("Delayed message with task" + i).getBytes())
.deliverAfter(3, TimeUnit.SECONDS)
.sendAsync().thenAccept(messageId -> {
sendMessageMap.put(taskId, messageId);
});
}
producer.flush();
// ===========================================================================================================
String taskId = "task5";
MessageId oldMessageId = sendMessageMap.get(taskId);
// 1.Turn task5 into a delay of 1 seconds. Trigger task5 in advance.
// deliverAfter: 3s -> 1s
producer.newMessage()
.key(taskId)
.value(("Delayed message with " + taskId).getBytes())
.deliverAfter(1, TimeUnit.SECONDS)
.property(TASK_ID_OF_DELAY_TIME_CHANGE, taskId)
.property(DELAY_CHANGE_DEPRECATED_MESSAGE_ID, new String(sendMessageMap.get(taskId).toByteArray()))
.sendAsync().thenAccept(messageId -> {
log.warn("[Delayed message change] TaskId: {}, Old MessageId: {}, New MessageId: {}",
taskId, sendMessageMap.get(taskId), messageId);
sendMessageMap.put(taskId, messageId);
});
producer.flush();
// 2.If you want to extend the delay time.
// deliverAfter: 3s -> 5s
producer.newMessage()
.key(taskId)
.value(("Delayed message with " + taskId).getBytes())
.deliverAfter(5, TimeUnit.SECONDS)
.sendAsync().thenAccept(messageId -> {
log.warn("[Delayed message change] TaskId: {}, Old MessageId: {}, New MessageId: {}",
taskId, sendMessageMap.get(taskId), messageId);
sendMessageMap.put(taskId, messageId);
// A special "messages with no business implications" is sent just to change the delay time.
producer.newMessage()
.key("DEPRECATED_" + taskId)
.value(("DEPRECATED_" + taskId).getBytes(UTF_8))
.property(TASK_ID_OF_DELAY_TIME_CHANGE, taskId)
.property(DELAY_CHANGE_DEPRECATED_MESSAGE_ID, new String(oldMessageId.toByteArray(), UTF_8))
.property(DELAY_CHANGE_NEW_MESSAGE_ID, new String(messageId.toByteArray(), UTF_8))
.sendAsync();
});
producer.flush();
try {
Awaitility.await().atMost(Duration.ofSeconds(30)).until(() -> receivedMessageIdMap.containsKey(taskId) &&
receivedMessageIdMap.size() == 10 && !receivedMessageIdMap.get(taskId).equals(oldMessageId));
} finally {
consumer.close();
}
}
Choose one of 1 and 2 below.
If this is not a bug, it can be converted into a github discussion.