pulsar
pulsar copied to clipboard
[fix][broker] fix replicated subscriptions for transactional messages
Motivation
In non-transactional production, we update the LastDataMessagePublishedTimestamp
when the message is persisted successfully. But in transactional production, we do not update LastDataMessagePublishedTimestamp
, which will impact the feature ReplicatedSubscription
.
Modifications
Update the LastDataMessagePublishedTimestamp
when the transaction is committed or aborted, both of which can move forward the max read position.
Verifying this change
- [x] Make sure that the change passes the CI checks.
(Please pick either of the following options)
This change is a trivial rework / code cleanup without any test coverage.
Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes
- [ ] Dependencies (add or upgrade a dependency)
- [ ] The public API
- [ ] The schema
- [ ] The default values of configurations
- [ ] The threading model
- [ ] The binary protocol
- [ ] The REST endpoints
- [ ] The admin CLI options
- [ ] The metrics
- [ ] Anything that affects deployment
Documentation
- [ ]
doc
- [ ]
doc-required
- [x]
doc-not-needed
- [ ]
doc-complete
Matching PR in forked repository
PR in forked repository: https://github.com/thetumbled/pulsar/pull/45
PTAL, thanks. @BewareMyPower @poorbarcode @codelipenghui @liangyepianzhou
Could you add a test for it?
Added, PTAL, thanks.
@thetumbled You need to add a test related to transaction, right?
@thetumbled You need to add a test related to transaction, right?
testUpdateLastDataMessagePublishedTimestampForTransactionalPublish()
Hi everyone, i have add the end-to-end test code TransactionalReplicateSubscriptionTest
to reproduce the bug replicated subscriptions not work for transactional messages
, you can add it alone to master branch to reproduce the bug.
Looking for your reply, thanks. @codelipenghui @lhotari @liangyepianzhou @dao-jun @BewareMyPower @Technoboy-
I just don't understand, it seems ReplicatorSubscriptionController only snapshots ledger.lastPosition
, why do we update lastDataMessagePublishedTimestamp
after maxReadPosition
moved?
Please check the typo "maker", should be "marker"
fixed, thanks.
it looks if
transactionCoordinatorEnabled=false
and publish normal msgs only to the topic,lastDataMessagePublishedTimestamp
will not update.
Thanks for review, i have updated the code, PTAL.
I just don't understand, it seems ReplicatorSubscriptionController only snapshots
ledger.lastPosition
, why do we updatelastDataMessagePublishedTimestamp
aftermaxReadPosition
moved?
We may need to have a discussion about which time point should we update lastDataMessagePublishedTimestamp
. There are two options:
- update
lastDataMessagePublishedTimestamp
whenever a transactional/non-transactional message is persisted. - update
lastDataMessagePublishedTimestamp
when the max position move forward.
The difference of these two options is that the frequency of the former is greater than the latter. With greater frequency, we have a more precice position for ReplicatorSubscription.
But because the consumer can't consume the messages after the max read position, and the number of snapshot is limited(default 10), there are high risk that when a consumer ack a position belonging to a committed transaction, the corresponding snapshots are already evicted(the frequency of snapshot is very fast, a transaction last for more than 10s will meet such case in default configuration), thus the ReplicatorSubscription
feature can't work at all.
Looking for your reply. @lhotari @codelipenghui @liangyepianzhou @dao-jun @Technoboy-
I just don't understand, it seems ReplicatorSubscriptionController only snapshots
ledger.lastPosition
, why do we updatelastDataMessagePublishedTimestamp
aftermaxReadPosition
moved?We may need to have a discussion about which time point should we update
lastDataMessagePublishedTimestamp
. There are two options:
- update
lastDataMessagePublishedTimestamp
whenever a transactional/non-transactional message is persisted.- update
lastDataMessagePublishedTimestamp
when the max position move forward.The difference of these two options is that the frequency of the former is greater than the latter. With greater frequency, we have a more precice position for ReplicatorSubscription. But because the consumer can't consume the messages after the max read position, and the number of snapshot is limited(default 10), there are high risk that when a consumer ack a position belonging to a committed transaction, the corresponding snapshots are already evicted(the frequency of snapshot is very fast, a transaction last for more than 10s will meet such case in default configuration), thus the
ReplicatorSubscription
feature can't work at all. Looking for your reply. @lhotari @codelipenghui @liangyepianzhou @dao-jun @Technoboy-
+1 with option 1
- update
lastDataMessagePublishedTimestamp
when the max position move forward.
this seems to make sense. assuming that "when the max position moves forward", there are more messages available to be read.
It might make sense to rename lastDataMessagePublishedTimestamp
to cover both the non-tranactional and tranactional case. The field name should also include the purpose, this is for replicated subscriptions. It's also possible to cover the meaning in a code comment for the field.
- update
lastDataMessagePublishedTimestamp
when the max position move forward.this seems to make sense. assuming that "when the max position moves forward", there are more messages available to be read. It might make sense to rename
lastDataMessagePublishedTimestamp
to cover both the non-tranactional and tranactional case. The field name should also include the purpose, this is for replicated subscriptions. It's also possible to cover the meaning in a code comment for the field.
good idea.
PTAL, thanks. @lhotari @codelipenghui @liangyepianzhou @dao-jun @Technoboy-
closing and reopening to pick up recent changes from master branch
/pulsarbot rerun-failure-checks
Closing and reopening to get recent changes from master branch to the PR build.
Codecov Report
Attention: Patch coverage is 79.41176%
with 7 lines
in your changes are missing coverage. Please review.
Project coverage is 73.13%. Comparing base (
bbc6224
) to head (2f416f0
). Report is 254 commits behind head on master.
Additional details and impacted files
@@ Coverage Diff @@
## master #22452 +/- ##
============================================
- Coverage 73.57% 73.13% -0.45%
- Complexity 32624 32791 +167
============================================
Files 1877 1888 +11
Lines 139502 141132 +1630
Branches 15299 15490 +191
============================================
+ Hits 102638 103216 +578
- Misses 28908 29940 +1032
- Partials 7956 7976 +20
Flag | Coverage Δ | |
---|---|---|
inttests | 27.32% <58.82%> (+2.73%) |
:arrow_up: |
systests | 24.77% <20.58%> (+0.45%) |
:arrow_up: |
unittests | 72.13% <79.41%> (-0.72%) |
:arrow_down: |
Flags with carried forward coverage won't be shown. Click here to find out more.
Files | Coverage Δ | |
---|---|---|
...sar/broker/service/persistent/PersistentTopic.java | 79.10% <100.00%> (+0.64%) |
:arrow_up: |
.../persistent/ReplicatedSubscriptionsController.java | 70.37% <100.00%> (-2.23%) |
:arrow_down: |
...ransaction/buffer/impl/TopicTransactionBuffer.java | 88.05% <100.00%> (+0.30%) |
:arrow_up: |
...nsaction/buffer/impl/TransactionBufferDisable.java | 53.57% <40.00%> (-2.96%) |
:arrow_down: |
...ransaction/buffer/impl/InMemTransactionBuffer.java | 56.47% <20.00%> (-1.11%) |
:arrow_down: |
/pulsarbot rerun-failure-checks
/pulsarbot rerun-failure-checks
The test keeps failing, could you please resolve it?
The test keeps failing, could you please resolve it?
I don't meet this error.
Closing and reopening to get recent changes from master branch to the PR build.
/pulsarbot rerun-failure-checks
@poorbarcode any more change requests?
/pulsarbot rerun-failure-checks
/pulsarbot rerun-failure-checks
Error: org.apache.pulsar.broker.service.ReplicatorSubscriptionWithTransactionTest.testReplicatedSubscriptionWhenReplicatorProducerIsClosed Time elapsed: 12.567 s <<< FAILURE!
org.awaitility.core.ConditionTimeoutException: Assertion condition defined as a org.apache.pulsar.broker.service.ReplicatorSubscriptionTest expected object to not be null within 10 seconds.
Pulsar CI / CI - Unit - Brokers - Broker Group 1 (pull_request) test is not stable, please check the reason
Error: org.apache.pulsar.broker.service.ReplicatorSubscriptionWithTransactionTest.testReplicatedSubscriptionWhenReplicatorProducerIsClosed Time elapsed: 12.567 s <<< FAILURE! org.awaitility.core.ConditionTimeoutException: Assertion condition defined as a org.apache.pulsar.broker.service.ReplicatorSubscriptionTest expected object to not be null within 10 seconds.
Pulsar CI / CI - Unit - Brokers - Broker Group 1 (pull_request) test is not stable, please check the reason
I have fixed the test code, please help to trigger the CI again, thanks.
/pulsarbot rerun-failure-checks
/pulsarbot rerun-failure-checks