[fix][broker] The offline consumer is not removed from the consumerList
Motivation
We found a problem: when the consumer was removed, it was not removed from the consumerList, but it was removed from the consumerSet and consumers:
The log to remove the consumer is as follows:
10:31:40.404 [BookKeeperClientWorker-OrderedExecutor-30-0] INFO org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers - Removed consumer Consumer{subscription=PersistentSubscription{topic=persistent://teg_onion_onion_logcenter_data_gz/teg_onion_onion_logcenter_data_gz/teg_onion_onion_logcenter_data_gz-partition-51, name=t_teg_onion_b_teg_onion_onion_logcenter_data_gz_cg_consumer_001}, consumerId=51, consumerName=ff7f4103a3, address=/11.181.202.252:55902} with pending 0 acks
Documentation
Check the box below or label this PR directly (if you have committer privilege).
Need to update docs?
-
[ ]
doc-required(If you need help on updating docs, create a doc issue)
-
[x]
no-need-doc(Please explain why)
-
[ ]
doc(If this PR contains doc changes)
https://github.com/apache/pulsar/issues/13787 Just like this bug.
In fact, the problem here may be the problem of consumer equality, similar to the previous producer problem.
When there are duplicate consumers in the consumerList, it is possible to remove the actually successfully registered consumers when removing them.
In our scenario, we also found a problem: when the consumer reconnects, the broker no longer pushes messages to these consumers. Through the stats command, we see these The consumer's permit is 0, and it is a consumer that has been removed from the consumerSet and consumers. The wrong consumer could be removed .

So the fundamental problem here is actually the definition of consumer equality. I think we might need to do something like https://github.com/apache/pulsar/pull/12846 on the definition of equality for consumers.
I think we also need to implement a method similar to isSuccessorTo to determine whether the consumer can be overridden, what do you think? @lhotari @michaeljmarshall @eolivelli
like this:
public boolean isSuccessorTo(Consumer other){
return Objects.equals(cnx.clientAddress(), other.cnx.clientAddress()) && consumerId == other.consumerId
&& other.consumerEpoch < consumerEpoch;
}
@lordcheng10 - that is a great finding. I think it seems very reasonable to adjust the consumer logic, but I still haven't researched the consumer remove logic, so I am not authoritative on the subject. I wonder if we'd be able to add a test that shows the isSuccessorTo logic is necessary.
/pulsarbot run-failure-checks
/pulsarbot run-failure-checks
/pulsarbot run-failure-checks
The pr had no activity for 30 days, mark with Stale label.
The pr had no activity for 30 days, mark with Stale label.