pulsar icon indicating copy to clipboard operation
pulsar copied to clipboard

[Bug] Bundle unload can cause shared consumer receive duplicate messages

Open mawenyu opened this issue 1 year ago • 4 comments

Search before asking

  • [X] I searched in the issues and found nothing similar.

Read release policy

  • [X] I understand that unsupported versions don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker.

Version

3.0.4

Minimal reproduce step

  1. two node broker
  2. three client process use same shared subscription to consumer one topic
  3. consumer receiveQueue 5
  4. consumer cost 3 second to handle each message and then ack
  5. unload topic from brokerA to brokerB
  6. three client process may receive some messages that already in receiveQueue of other consumer ; which will finally cause the same message be handled by two consumer

What did you expect to see?

bundle unload will not cause message duplication

What did you see instead?

bundle unload will not cause unacked message deliver to other consumer, which will cause data duplication

Anything else?

I think when consumer reconnected, consumer should first tell broker what messages they already hold , broker should redeliver these messages;

Are you willing to submit a PR?

  • [ ] I'm willing to submit a PR!

mawenyu avatar Jul 19 '24 15:07 mawenyu

The problem of message duplication is difficult to avoid. The better solution now is to implement idempotence on the client side.

hanmz avatar Jul 22 '24 07:07 hanmz

I solved this issue by modifying the consumer source code. Could you please ask the community if they would consider adopting this approach or if there are any more elegant solutions? The main modifications are:

  1. In the org.apache.pulsar.client.impl.ConsumerImpl#connectionOpened , call the parent consumer’s org.apache.pulsar.client.impl.MultiTopicsConsumerImpl clearIncomingMessages(). —— This is necessary because Pulsar does not perform this cleanup , these message will cause duplication
  2. In the org.apache.pulsar.client.impl.ConsumerImpl#connectionOpened delay the invoke of org.apache.pulsar.client.impl.ConsumerImpl#increaseAvailablePermits by 3 seconds and execute it asynchronously. —— This 3-second delay allows the client to acknowledge the received messages.

mawenyu avatar Jul 25 '24 08:07 mawenyu

I solved this issue by modifying the consumer source code. Could you please ask the community if they would consider adopting this approach or if there are any more elegant solutions? The main modifications are:

  1. In the org.apache.pulsar.client.impl.ConsumerImpl#connectionOpened , call the parent consumer’s org.apache.pulsar.client.impl.MultiTopicsConsumerImpl clearIncomingMessages(). —— This is necessary because Pulsar does not perform this cleanup , these message will cause duplication

@mawenyu Interesting. This seems to be related to #21767 which is a draft PR I have.

  1. In the org.apache.pulsar.client.impl.ConsumerImpl#connectionOpened delay the invoke of org.apache.pulsar.client.impl.ConsumerImpl#increaseAvailablePermits by 3 seconds and execute it asynchronously. —— This 3-second delay allows the client to acknowledge the received messages.

Just wondering if the https://github.com/apache/pulsar/wiki/PIP-84-:-Pulsar-client:-Redeliver-command-add-epoch design or implementation has some gap.

lhotari avatar Aug 09 '24 13:08 lhotari

@mawenyu Which Pulsar client version and Pulsar broker version are you using?

lhotari avatar Aug 09 '24 13:08 lhotari

@mawenyu Which Pulsar client version and Pulsar broker version are you using?

3.0.4

mawenyu avatar Oct 25 '24 07:10 mawenyu