[Bug] Delayed message delivery receive duplicate messages when unload topic
Search before asking
- [X] I searched in the issues and found nothing similar.
Version
2.7.5 / 2.8.4 /master
Minimal reproduce step
- Run the code for 20 minutes
public static void main(String[] args) {
String url = "pulsar://10.98.242.210:6650";
String t = "persistent://components/test/test_deliver";
new Thread(() -> {
try {
PulsarClient client = PulsarClient.builder()
.serviceUrl(url)
.ioThreads(1)
.listenerThreads(2)
.build();
Producer<String> producerA = client.newProducer(Schema.STRING)
.topic(t)
.producerName("test")
.create();
Random random = new Random();
while (true) {
String message = String.valueOf(System.currentTimeMillis());
producerA.newMessage(Schema.STRING).value(message).deliverAfter(random.nextInt(30), TimeUnit.MINUTES).send();
Thread.sleep(10L);
}
} catch (Exception e) {
e.printStackTrace();
}
}).start();
Set<String> stringSet = new ConcurrentHashSet<>();
try {
PulsarClient client = PulsarClient.builder()
.serviceUrl(url)
.ioThreads(1)
.listenerThreads(2)
.build();
Consumer consumer = client.newConsumer(Schema.STRING)
.topic(t)
.subscriptionName("frozen")
.subscriptionType(SubscriptionType.Shared)
.subscribe();
while (true) {
Message<String> message = consumer.receive();
if (stringSet.contains(message.getValue())) {
log.info("duplicate messages thread={} message={}", Thread.currentThread().getName(), message.getValue());
} else {
stringSet.add(message.getValue());
}
consumer.acknowledge(message);
}
} catch (Exception e) {
e.printStackTrace();
}
}
- After 20 minutes, execute unload cmd:
bin/pulsar-admin topics unload persistent://components/test/test_deliver
- See consumer log, duplicate messages
What did you expect to see?
no duplicate messages
What did you see instead?
duplicate messages
Anything else?
No response
Are you willing to submit a PR?
- [ ] I'm willing to submit a PR!
The issue had no activity for 30 days, mark with Stale label.
cc @coderzc since you're working on delay message heavily recently, can you help with this issue to see if it's a usage issue?
The issue had no activity for 30 days, mark with Stale label.
I maybe understand what caused the issue, try set isAckReceiptEnabled(true) to true when create consumer.
Consumer consumer = client.newConsumer(Schema.STRING)
.topic(t)
.subscriptionName("frozen")
// enable ack receipt
.isAckReceiptEnabled(true)
.subscriptionType(SubscriptionType.Shared)
.subscribe();
Then, try you test again.
@dao-jun My client is pulsar-client-go. Is there similar consumer option like isAckReceiptEnabled. Maybe AckWithResponse? Could you explain the reason for the duplicate consume?
@dao-jun I tried but it did not solve the promblem. But some backlog area skip speed up. Maybe consume msg twice cannot change the first consume message status? I will try another tomorrow.
func test() {
clientOps := pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
}
client, err := pulsar.NewClient(clientOps)
if err != nil {
return
}
defer client.Close()
consumerOps := pulsar.ConsumerOptions{
Topic: "persistent://public/default/my-topic",
SubscriptionName: "test",
AckWithResponse: true,
}
consumer, err := client.Subscribe(consumerOps)
if err != nil {
return
}
defer consumer.Close()
for {
msg, err := consumer.Receive(context.Background())
if err != nil {
return
}
// Do your business here
consumer.Ack(msg)
}
}
@SennoYuki
The problem is Pulsar optimized consumer.ack.
When you call consumer.ack, the ack request will not send to the broker immediately, but cached in the client. When the cached ack requests reached the threshold, send them to the broker in batch.
If you unload the topic when the ack requests didn't send to the broker, you will receive duplicated messages.
enable AckWithResponse, the consumer will wait until broker received the ack
@dao-jun I think it's only a part reason for small batch duplicated messages. But in my scene the duplicated messages is huge. It's not possible causing the 20M and more message duplicated consume. And in topics stats the non ack messages shows smaller than 1k
Few unacked message also have a chance to cause many duplicated messages, you can take a try @SennoYuki
ok @dao-jun , I will retry tomorrow. Hope a good result.
Mark:
The root cause is the conflicts between cursor.readEntries and cursor.asyncReplayEntries.
@dao-jun I retry serveral times . It dose not work
I guess your topic have non-delayed delivery messages and delay delivery messages, if your topic only have delayed delivery messages, it will be work. As I marked before, it's the conflicts between cursor.readEntries and cursor.asyncReplayEntries, one for delayed messages, another fro non-delayed delivery messages. We don't have a suitable solution yet to solve the problem. @SennoYuki
@dao-jun I confirm it dose not contain non-delayed delivery message. In my program any message has more than 5 minute delay time. Sad, I choose Pulsar because it supports delayed messages. Any plan for this bug?