[Bug] NullPointerException in ConsumerBase.callMessageListener because "this.unAckedMessageTracker" is null
Search before asking
- [X] I searched in the issues and found nothing similar.
Read release policy
- [X] I understand that unsupported versions don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker.
Version
Pulsar client hash 5bbd617, slightly after 3.2.3 release
Minimal reproduce step
none
What did you expect to see?
No NPE
What did you see instead?
2024-08-15 12:34:56.123 ERROR host=localhost [pulsar-external-listener-11-1] logger=o.a.p.c.i.ConsumerBase [persistent://public/default/test][consumer] Message listener error in processing message: 21:2345:5
java.lang.NullPointerException: Cannot invoke "org.apache.pulsar.client.impl.UnAckedMessageTracker.add(org.apache.pulsar.client.api.MessageId, int)" because "this.unAckedMessageTracker" is null
at org.apache.pulsar.client.impl.ConsumerBase.callMessageListener(ConsumerBase.java:1167)
at org.apache.pulsar.client.impl.ConsumerBase.lambda$triggerListener$9(ConsumerBase.java:1131)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at [org.apache.pulsar.shade.io](http://org.apache.pulsar.shade.io/).netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:840)
This happens here: https://github.com/apache/pulsar/blob/5bbd6175a3aaf3a4413af784af9d06d5c748a32d/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java#L1167
Anything else?
The field unAckedMessageTracker isn't a final field and it is set to null in org.apache.pulsar.client.impl.MultiTopicsConsumerImpl#cleanupMultiConsumer .
https://github.com/apache/pulsar/blob/0edb8a934704ede1cc134983a84016e611ac8cec/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java#L651
cleanupMultiConsumer is called from 3 locations:
https://github.com/apache/pulsar/blob/0edb8a934704ede1cc134983a84016e611ac8cec/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java#L587
https://github.com/apache/pulsar/blob/0edb8a934704ede1cc134983a84016e611ac8cec/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java#L629
https://github.com/apache/pulsar/blob/0edb8a934704ede1cc134983a84016e611ac8cec/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java#L995
Are you willing to submit a PR?
- [ ] I'm willing to submit a PR!
Hi @lhotari,
I'm interested in contributing to Pulsar and believe this issue is a good starting point. I've reviewed the contribution guidelines and have submitted a PR for it: https://github.com/apache/pulsar/pull/23195. Since this is my first contribution, I'd appreciate it if you could take a look and let me know if there's anything I may have missed. I want to make sure I'm on the right track.
Thank you!
Hi @lhotari,
I'm interested in contributing to Pulsar and believe this issue is a good starting point. I've reviewed the contribution guidelines and have submitted a PR for it: #23195. Since this is my first contribution, I'd appreciate it if you could take a look and let me know if there's anything I may have missed. I want to make sure I'm on the right track.
Thank you!
Hi @parthpandya00, thanks for your contribution! I provided feedback here: https://github.com/apache/pulsar/pull/23195#issuecomment-2295369996 . Fixing the NPE itself is trivial, but suppressing the NPE could hide the root cause which we'd like to understand.
This should be handled in a way where org.apache.pulsar.client.impl.ConsumerBase#callMessageListener checks the status of the consumer and doesn't process the message if the state of the consumer is Closing or Closed. This change would also prevent duplicate processing that currently occurs with message listeners when the consumer is closing. The messages get buffered in the message listener executor and would get processed regardless of the consumer state. Since the message cannot be acknowledged, that will result in duplicate processing.