pulsar icon indicating copy to clipboard operation
pulsar copied to clipboard

[Bug] Delayed message delivery receive duplicate messages when unload topic

Open jdfrozen opened this issue 2 years ago • 16 comments

Search before asking

  • [X] I searched in the issues and found nothing similar.

Version

2.7.5 / 2.8.4 /master

Minimal reproduce step

  1. Run the code for 20 minutes
public static void main(String[] args) {
        String url = "pulsar://10.98.242.210:6650";
        String t = "persistent://components/test/test_deliver";
        new Thread(() -> {
            try {
                PulsarClient client = PulsarClient.builder()
                    .serviceUrl(url)
                    .ioThreads(1)
                    .listenerThreads(2)
                    .build();
                Producer<String> producerA = client.newProducer(Schema.STRING)
                    .topic(t)
                    .producerName("test")
                    .create();
                Random random = new Random();
                while (true) {
                    String message = String.valueOf(System.currentTimeMillis());
                    producerA.newMessage(Schema.STRING).value(message).deliverAfter(random.nextInt(30), TimeUnit.MINUTES).send();
                    Thread.sleep(10L);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }).start();
        Set<String> stringSet = new ConcurrentHashSet<>();
        try {
            PulsarClient client = PulsarClient.builder()
                .serviceUrl(url)
                .ioThreads(1)
                .listenerThreads(2)
                .build();
            Consumer consumer = client.newConsumer(Schema.STRING)
                .topic(t)
                .subscriptionName("frozen")
                .subscriptionType(SubscriptionType.Shared)
                .subscribe();
            while (true) {
                Message<String> message = consumer.receive();
                if (stringSet.contains(message.getValue())) {
                    log.info("duplicate messages thread={} message={}", Thread.currentThread().getName(), message.getValue());
                } else {
                    stringSet.add(message.getValue());
                }
                consumer.acknowledge(message);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
  1. After 20 minutes, execute unload cmd:
bin/pulsar-admin topics unload persistent://components/test/test_deliver
  1. See consumer log, duplicate messages

What did you expect to see?

no duplicate messages

What did you see instead?

duplicate messages

Anything else?

No response

Are you willing to submit a PR?

  • [ ] I'm willing to submit a PR!

jdfrozen avatar Apr 14 '23 07:04 jdfrozen

The issue had no activity for 30 days, mark with Stale label.

github-actions[bot] avatar May 15 '23 01:05 github-actions[bot]

cc @coderzc since you're working on delay message heavily recently, can you help with this issue to see if it's a usage issue?

tisonkun avatar Jun 15 '23 14:06 tisonkun

The issue had no activity for 30 days, mark with Stale label.

github-actions[bot] avatar Jul 16 '23 02:07 github-actions[bot]

I maybe understand what caused the issue, try set isAckReceiptEnabled(true) to true when create consumer.

            Consumer consumer = client.newConsumer(Schema.STRING)
                .topic(t)
                .subscriptionName("frozen")
                // enable ack receipt
                .isAckReceiptEnabled(true)
                .subscriptionType(SubscriptionType.Shared)
                .subscribe();

Then, try you test again.

dao-jun avatar May 21 '24 16:05 dao-jun

@dao-jun My client is pulsar-client-go. Is there similar consumer option like isAckReceiptEnabled. Maybe AckWithResponse? Could you explain the reason for the duplicate consume?

SennoYuki avatar May 22 '24 02:05 SennoYuki

@dao-jun I tried but it did not solve the promblem. But some backlog area skip speed up. Maybe consume msg twice cannot change the first consume message status? I will try another tomorrow.

SennoYuki avatar May 22 '24 06:05 SennoYuki

func test() {
	clientOps := pulsar.ClientOptions{
		URL: "pulsar://localhost:6650",
	}
	client, err := pulsar.NewClient(clientOps)
	if err != nil {
		return
	}
	defer client.Close()
	consumerOps := pulsar.ConsumerOptions{
		Topic:            "persistent://public/default/my-topic",
		SubscriptionName: "test",
		AckWithResponse:  true,
	}
	consumer, err := client.Subscribe(consumerOps)
	if err != nil {
		return
	}
	defer consumer.Close()
	for {
		msg, err := consumer.Receive(context.Background())
		if err != nil {
			return
		}
		// Do your business here
		consumer.Ack(msg)
	}
}

@SennoYuki

dao-jun avatar May 22 '24 09:05 dao-jun

The problem is Pulsar optimized consumer.ack. When you call consumer.ack, the ack request will not send to the broker immediately, but cached in the client. When the cached ack requests reached the threshold, send them to the broker in batch.

If you unload the topic when the ack requests didn't send to the broker, you will receive duplicated messages.

dao-jun avatar May 22 '24 09:05 dao-jun

enable AckWithResponse, the consumer will wait until broker received the ack

dao-jun avatar May 22 '24 09:05 dao-jun

@dao-jun I think it's only a part reason for small batch duplicated messages. But in my scene the duplicated messages is huge. It's not possible causing the 20M and more message duplicated consume. And in topics stats the non ack messages shows smaller than 1k

SennoYuki avatar May 22 '24 10:05 SennoYuki

Few unacked message also have a chance to cause many duplicated messages, you can take a try @SennoYuki

dao-jun avatar May 22 '24 12:05 dao-jun

ok @dao-jun , I will retry tomorrow. Hope a good result.

SennoYuki avatar May 22 '24 14:05 SennoYuki

Mark: The root cause is the conflicts between cursor.readEntries and cursor.asyncReplayEntries.

dao-jun avatar May 22 '24 20:05 dao-jun

@dao-jun I retry serveral times . It dose not work

SennoYuki avatar May 25 '24 03:05 SennoYuki

I guess your topic have non-delayed delivery messages and delay delivery messages, if your topic only have delayed delivery messages, it will be work. As I marked before, it's the conflicts between cursor.readEntries and cursor.asyncReplayEntries, one for delayed messages, another fro non-delayed delivery messages. We don't have a suitable solution yet to solve the problem. @SennoYuki

dao-jun avatar May 26 '24 13:05 dao-jun

@dao-jun I confirm it dose not contain non-delayed delivery message. In my program any message has more than 5 minute delay time. Sad, I choose Pulsar because it supports delayed messages. Any plan for this bug?

SennoYuki avatar May 27 '24 03:05 SennoYuki