[Bug] Bundle unload can cause shared consumer receive duplicate messages
Search before asking
- [X] I searched in the issues and found nothing similar.
Read release policy
- [X] I understand that unsupported versions don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker.
Version
3.0.4
Minimal reproduce step
- two node broker
- three client process use same shared subscription to consumer one topic
- consumer receiveQueue 5
- consumer cost 3 second to handle each message and then ack
- unload topic from brokerA to brokerB
- three client process may receive some messages that already in receiveQueue of other consumer ; which will finally cause the same message be handled by two consumer
What did you expect to see?
bundle unload will not cause message duplication
What did you see instead?
bundle unload will not cause unacked message deliver to other consumer, which will cause data duplication
Anything else?
I think when consumer reconnected, consumer should first tell broker what messages they already hold , broker should redeliver these messages;
Are you willing to submit a PR?
- [ ] I'm willing to submit a PR!
The problem of message duplication is difficult to avoid. The better solution now is to implement idempotence on the client side.
I solved this issue by modifying the consumer source code. Could you please ask the community if they would consider adopting this approach or if there are any more elegant solutions? The main modifications are:
- In the
org.apache.pulsar.client.impl.ConsumerImpl#connectionOpened, call the parent consumer’sorg.apache.pulsar.client.impl.MultiTopicsConsumerImplclearIncomingMessages(). —— This is necessary because Pulsar does not perform this cleanup , these message will cause duplication - In the
org.apache.pulsar.client.impl.ConsumerImpl#connectionOpeneddelay the invoke oforg.apache.pulsar.client.impl.ConsumerImpl#increaseAvailablePermitsby 3 seconds and execute it asynchronously. —— This 3-second delay allows the client to acknowledge the received messages.
I solved this issue by modifying the consumer source code. Could you please ask the community if they would consider adopting this approach or if there are any more elegant solutions? The main modifications are:
- In the
org.apache.pulsar.client.impl.ConsumerImpl#connectionOpened, call the parent consumer’sorg.apache.pulsar.client.impl.MultiTopicsConsumerImplclearIncomingMessages(). —— This is necessary because Pulsar does not perform this cleanup , these message will cause duplication
@mawenyu Interesting. This seems to be related to #21767 which is a draft PR I have.
- In the
org.apache.pulsar.client.impl.ConsumerImpl#connectionOpeneddelay the invoke oforg.apache.pulsar.client.impl.ConsumerImpl#increaseAvailablePermitsby 3 seconds and execute it asynchronously. —— This 3-second delay allows the client to acknowledge the received messages.
Just wondering if the https://github.com/apache/pulsar/wiki/PIP-84-:-Pulsar-client:-Redeliver-command-add-epoch design or implementation has some gap.
@mawenyu Which Pulsar client version and Pulsar broker version are you using?
@mawenyu Which Pulsar client version and Pulsar broker version are you using?
3.0.4