pulsar icon indicating copy to clipboard operation
pulsar copied to clipboard

[fix][broker][branch-3.0] fix prepareInitPoliciesCacheAsync in SystemTopicBasedTopicPoliciesService

Open TakaHiR07 opened this issue 1 month ago • 4 comments

Fixes https://github.com/apache/pulsar/issues/24977

Motivation

As shown in the issue, fix two problem: 1. cleanCacheAndCloseReader() executed twice cause concurrent error, which result in too many orphan reader remain in SystemTopicBasedTopicPoliciesService 2. double update in policyCacheInitMap cause recursive update error

Modifications

  1. no need to do cleanCacheAndCloseReader() when throw exception, since the exception would be catch in outside code. By the way, in previous pulsar-version 2.9.x, cleanCacheAndCloseReader is also executed only once
  2. avoid double update in policyCacheInitMap. use putIfAbsent instead of computeIfAbsent. It is not appropriate to add so many operation into compute().
  3. add two test, to simulate if throw exception in createReader, initPolicyCache, readMorePolicy of prepareInitPoliciesCacheAsync. By the way, it seems lack of unittest in SystemTopicBasedTopicPoliciesService.
  4. new method "newReader()" to ensure only one readerCreateCompletableFuture. Actually this method is add for test. The whole process of prepareInitPoliciesCacheAsync() is : put future -> put reader -> throw exception -> remove reader -> remove future. so even without "newReader()", namespace's reader in readerCache can be ensure only one.

There is one point should be consider in this pr

  1. When use putIfAbsent, if too many getTopicPolicy() trigger prepareInitPoliciesCacheAsync, it would generate many empty completableFuture. Further more, we can use double check in the code to avoid this object gc.(the code would be ugly).

Besides, this case still exist: if failed to close reader in cleanCacheAndCloseReader(), this closing reader maybe have chance to reconnect and become orphan reader. This is not this pr's work.

Verifying this change

  • [] Make sure that the change passes the CI checks.

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • [ ] Dependencies (add or upgrade a dependency)
  • [ ] The public API
  • [ ] The schema
  • [ ] The default values of configurations
  • [ ] The threading model
  • [ ] The binary protocol
  • [ ] The REST endpoints
  • [ ] The admin CLI options
  • [ ] The metrics
  • [ ] Anything that affects deployment

Documentation

  • [ ] doc
  • [ ] doc-required
  • [x] doc-not-needed
  • [ ] doc-complete

TakaHiR07 avatar Nov 13 '25 12:11 TakaHiR07

@TakaHiR07 Thanks for the great analysis and fix. Would it be possible to make a fix to master branch too? Does the problem appear there too? Usually we target master branch first and then backport to maintenance branches.

lhotari avatar Nov 13 '25 17:11 lhotari

@TakaHiR07 Thanks for the great analysis and fix. Would it be possible to make a fix to master branch too? Does the problem appear there too? Usually we target master branch first and then backport to maintenance branches.

@lhotari The problem catch exception and cleanPolicyInitMap twice appear too. Have push a pr, https://github.com/apache/pulsar/pull/24980. There is a bit different with branch-3.0, since there are some modification in pr-24658

TakaHiR07 avatar Nov 14 '25 10:11 TakaHiR07

@TakaHiR07 Please check this test failure:

  Error:  Tests run: 155, Failures: 1, Errors: 0, Skipped: 53, Time elapsed: 853.185 s <<< FAILURE! - in org.apache.pulsar.broker.admin.TopicPoliciesTest
  Error:  org.apache.pulsar.broker.admin.TopicPoliciesTest.testTopicPoliciesAfterCompaction[Clean_Cache](4)  Time elapsed: 0.367 s  <<< FAILURE!
  java.lang.AssertionError: expected [true] but found [false]
  	at org.testng.Assert.fail(Assert.java:110)
  	at org.testng.Assert.failNotEquals(Assert.java:1577)
  	at org.testng.Assert.assertTrue(Assert.java:56)
  	at org.testng.Assert.assertTrue(Assert.java:66)
  	at org.apache.pulsar.broker.admin.TopicPoliciesTest.testTopicPoliciesAfterCompaction(TopicPoliciesTest.java:3432)
  	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
  	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  	at java.base/java.lang.reflect.Method.invoke(Method.java:569)
  	at org.testng.internal.invokers.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:139)
  	at org.testng.internal.invokers.InvokeMethodRunnable.runOne(InvokeMethodRunnable.java:47)
  	at org.testng.internal.invokers.InvokeMethodRunnable.call(InvokeMethodRunnable.java:76)
  	at org.testng.internal.invokers.InvokeMethodRunnable.call(InvokeMethodRunnable.java:11)
  	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
  	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
  	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
  	at java.base/java.lang.Thread.run(Thread.java:840)

lhotari avatar Nov 14 '25 10:11 lhotari

@TakaHiR07 Please check this test failure:

  Error:  Tests run: 155, Failures: 1, Errors: 0, Skipped: 53, Time elapsed: 853.185 s <<< FAILURE! - in org.apache.pulsar.broker.admin.TopicPoliciesTest
  Error:  org.apache.pulsar.broker.admin.TopicPoliciesTest.testTopicPoliciesAfterCompaction[Clean_Cache](4)  Time elapsed: 0.367 s  <<< FAILURE!

@lhotari have fixed. In testTopicPoliciesAfterCompaction#clearTopicPoliciesCache, should also clear readerCaches. readerCaches and policyCacheInitMap put and remove element is always together.

TakaHiR07 avatar Nov 14 '25 14:11 TakaHiR07