[fix][broker]One topic can be close multiple times concurrently
Motivation
With the transaction feature, we send and receive messages, and at the same time, execute admin API: unload namespace 1000 times. Then the problem occur: Consumer could not receive any message, and there has no error log. After that we tried admin API: get topic stats, and the response showed only producers are registered on topic, and no consumers are registered on topic, but consumer stat is Ready in the client. This means that the state of the consumer is inconsistent between the broker and the client.
Location problem
Then we found the problem: Two PersistentTopic which have the same name registered at a broker node, consumer registered on one (aka topic-c), and producer registered on another one (aka topic-p). At this time, when we send messages, the data flow like this :
client: producer sends a message
broker: handle cmd-send
broker: find the topic by name, it is "topic-p"
broker: find all subscriptions registered on "topic-p"
broker: found one subscription, but it has no consumers registered
broker: no need to send the message to the client
But the consumer exactly registered on another topic: topic-c, so consumer could not receive any message.
Repreduce
How to reproduce two topics registered at the same broker node ?
Make transaction buffer recover, admin unload namespace, client create consumer, client create producer executed at the same time, the process flow like this (at the step-11, the problem occurs ):
| Time | transaction buffer recoverr |
admin unload namespace |
client create consumer |
client create producer |
|---|---|---|---|---|
| 1 | TB recover | |||
| 2 | TB recover failure | topic.unload | ||
| 3 | topic.close(false) | topic.close(true) | ||
| 4 | brokerService.topics.remove(topicName) | |||
| 5 | remove topic finish | lookup | ||
| 6 | create topic-c |
|||
| 7 | consumer registered on topic-c |
|||
| 8 | brokerService.topics.remove(topic) | |||
| 9 | remove topic-c finish |
lookup | ||
| 10 | create topic-p |
|||
| 11 | producer registered on topic-p |
- Each column means the individual process. e.g.
client create consumer,client create producer. - Multiple processes are going on at the same time, and all effet the
brokerService.topics. - Column
Timeis used only to indicate the order of each step, not the actual time. - The important steps are explained below:
step 3
Even if persistent topic propertyisClosingOrDeleting have already changed to true, it still can be executed another once, see line-1247:
https://github.com/apache/pulsar/blob/f230d15ffcd5f74cca13bd23b35ace784d6f8ce6/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L1240-L1249
Whether close can be executed depends on two predicates: is closing or @param closeWithoutWaitingClientDisconnect is true. This means that method topic.close can be reentrant executed when @param closeWithoutWaitingClientDisconnect is true, and in the implementation of admin API: unload namespace the parameter closeWithoutWaitingClientDisconnect is exactly true.
https://github.com/apache/pulsar/blob/f230d15ffcd5f74cca13bd23b35ace784d6f8ce6/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java#L723-L725
So when transaction buffer recover fail and admin unload namespace is executed at the same time, and transaction buffer recover fail before admin unload namespace, the topic will be removed from brokerService.topics twice.
step-4 / step-8
Because of the current implementation of BrokerService. removeTopicFromCache use cmd map.remove(key), not use map.remove(key, value), So this cmd can remove any value in the map, even if it's not the desired one.
https://github.com/apache/pulsar/blob/f230d15ffcd5f74cca13bd23b35ace784d6f8ce6/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java#L1956
To sum up: We should make these two changes:
- Make method
topic.closenon-reentrant. Also prevent reentrant betweentopic.closeandtopic.delete. - Use cmd
map.remove(key, value)instead ofmap.remove(key)in implementation ofBrokerService. removeTopicFromCache. This change will apply to both scenes:topic.closeandtopic.delete.
Modifications
- Use cmd
map.remove(key, value)instead ofmap.remove(key)in implementation of `BrokerService.- fixed by PR #17526
Documentation
-
[ ]
doc-required -
[x]
doc-not-needed -
[ ]
doc -
[ ]
doc-complete
This PR should merge into these branches:
-
branch-2.8 -
branch-2.9 -
branch-2.10 -
branch-2.11 -
master
/pulsarbot rerun-failure-checks
The pr had no activity for 30 days, mark with Stale label.
Since we will start the RC version of 3.0.0 on 2023-04-11, I will change the label/milestone of PR who have not been merged.
- The PR of type
featureis deferred to3.1.0 - The PR of type
fixis deferred to3.0.1
So drag this PR to 3.0.1
The pr had no activity for 30 days, mark with Stale label.
@poorbarcode please rebase (or merge changes from master to) this PR
@poorbarcode please rebase this PR. I guess this is still relevant and in the same category as #20540 .
@lhotari
@poorbarcode please rebase this PR. I guess this is still relevant and in the same category as https://github.com/apache/pulsar/pull/20540 .
Rebased
- Use cmd
map.remove(key, value)instead ofmap.remove(key)in implementation ofBrokerService. removeTopicFromCache. This change will apply to both scenes:topic.closeandtopic.delete.
Please update the PR description since this change was earlier split into another PR.
/pulsarbot rerun-failure-checks
The changes in the PR seem to cause 2 test failures. I happened to report the failures as flaky tests #21877 and #21876, but I already closed them. I uploaded the logs here: https://gist.github.com/lhotari/d0da6f477ffdd1c691040ffe26f71523 and https://gist.github.com/lhotari/abf15ee24529e21dd71286a33f36d4bb . The 2 tests seem to consistently fail with the PR changes.
@lhotari
Fixed the conflict with ExtensibleLoadManager, I think we need a separate PR to merge the below status of PersistentTopic to make the logic simpler 😂 :
-
isFenced -
isClosingOrDeleting -
transferring
Great work @poorbarcode
I think our topic's state change logic is complex enough to define the life cycle of topics and accordingly codify the states and transitions in a state-machine manner. This means we could replace the state flags with this state enum. Each state might define the corresponding future objects to dedup or to track the completion. Maybe this is future work.
For example, { Init, Active, Fenced, Deleting,
Transferring,
Disconnecting, // by transfer
Closing, Closed }
Init -> Active -> Fenced -> Active // fecned and unfenced Init -> Active -> Closing -> Closed // unload Init -> Active -> Deleting -> Closed // delete Init -> Active -> Transferring -> Disconnecting -> Closed // transfer
@heesung-sn
I think our topic's state change logic is complex enough to define the life cycle of topics and accordingly codify the states and transitions in a state-machine manner. This means we could replace the state flags with this state enum. Each state might define the corresponding future objects to dedup or to track the completion. Maybe this is future work.
Sure, thanks for the detail of suggesting ❤️
@poorbarcode Isn't this PR still relevant? please rebase this PR.
Rebase master
@poorbarcode there are some test failures, do you have a chance to check?
However, I think this Topic state management needs a serious refactoring.
Agree with you
However, I think this Topic state management needs a serious refactoring.
I suggest defining TopicState and revisit topic state transitions in a state machine manner.
@heesung-sn I agree that a state machine style would result in a more maintainable solution. We can handle that in a second step. There is urgency to address the long outstanding topic closing issues and this PR makes good progress in that area.
@poorbarcode looks like OneWayReplicatorTest.testUnFenceTopicToReuse fails
@poorbarcode looks like OneWayReplicatorTest.testUnFenceTopicToReuse fails
Sorry, I found a behavior change(before: broker tries to unfence topic to reuse when clos clients fail; after: this mechanism does not work), and it is difficulte to be fixed gracefully, I will try to fix it tomorrow.
@poorbarcode looks like OneWayReplicatorTest.testUnFenceTopicToReuse fails
Sorry, I found a behavior change(before: broker tries to unfence topic to reuse when clos clients fail; after: this mechanism does not work), and it is difficulte to be fixed gracefully, I will try to fix it tomorrow.
Fixed, the code is ugly now, sorry. Please review it again. Thanks. @lhotari @heesung-sn
Codecov Report
Attention: Patch coverage is 91.42857% with 6 lines in your changes are missing coverage. Please review.
Project coverage is 73.93%. Comparing base (
bbc6224) to head (057fe1d). Report is 202 commits behind head on master.
Additional details and impacted files
@@ Coverage Diff @@
## master #17524 +/- ##
============================================
+ Coverage 73.57% 73.93% +0.35%
- Complexity 32624 32640 +16
============================================
Files 1877 1885 +8
Lines 139502 140679 +1177
Branches 15299 15465 +166
============================================
+ Hits 102638 104010 +1372
+ Misses 28908 28622 -286
- Partials 7956 8047 +91
| Flag | Coverage Δ | |
|---|---|---|
| inttests | 27.23% <67.14%> (+2.64%) |
:arrow_up: |
| systests | 24.61% <52.85%> (+0.28%) |
:arrow_up: |
| unittests | 73.21% <91.42%> (+0.37%) |
:arrow_up: |
Flags with carried forward coverage won't be shown. Click here to find out more.
| Files | Coverage Δ | |
|---|---|---|
| ...java/org/apache/pulsar/common/util/FutureUtil.java | 77.04% <100.00%> (+2.50%) |
:arrow_up: |
| ...sar/broker/service/persistent/PersistentTopic.java | 78.73% <89.65%> (+0.27%) |
:arrow_up: |