pulsar icon indicating copy to clipboard operation
pulsar copied to clipboard

[fix][broker] fix replicated subscriptions for transactional messages

Open thetumbled opened this issue 10 months ago • 18 comments

Motivation

In non-transactional production, we update the LastDataMessagePublishedTimestamp when the message is persisted successfully. But in transactional production, we do not update LastDataMessagePublishedTimestamp, which will impact the feature ReplicatedSubscription.

Modifications

Update the LastDataMessagePublishedTimestamp when the transaction is committed or aborted, both of which can move forward the max read position.

Verifying this change

  • [x] Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • [ ] Dependencies (add or upgrade a dependency)
  • [ ] The public API
  • [ ] The schema
  • [ ] The default values of configurations
  • [ ] The threading model
  • [ ] The binary protocol
  • [ ] The REST endpoints
  • [ ] The admin CLI options
  • [ ] The metrics
  • [ ] Anything that affects deployment

Documentation

  • [ ] doc
  • [ ] doc-required
  • [x] doc-not-needed
  • [ ] doc-complete

Matching PR in forked repository

PR in forked repository: https://github.com/thetumbled/pulsar/pull/45

thetumbled avatar Apr 07 '24 06:04 thetumbled

PTAL, thanks. @BewareMyPower @poorbarcode @codelipenghui @liangyepianzhou

thetumbled avatar Apr 07 '24 06:04 thetumbled

Could you add a test for it?

Added, PTAL, thanks.

thetumbled avatar Apr 08 '24 07:04 thetumbled

@thetumbled You need to add a test related to transaction, right?

liangyepianzhou avatar Apr 08 '24 07:04 liangyepianzhou

@thetumbled You need to add a test related to transaction, right?

testUpdateLastDataMessagePublishedTimestampForTransactionalPublish()

thetumbled avatar Apr 08 '24 07:04 thetumbled

Hi everyone, i have add the end-to-end test code TransactionalReplicateSubscriptionTest to reproduce the bug replicated subscriptions not work for transactional messages, you can add it alone to master branch to reproduce the bug. Looking for your reply, thanks. @codelipenghui @lhotari @liangyepianzhou @dao-jun @BewareMyPower @Technoboy-

thetumbled avatar Apr 10 '24 03:04 thetumbled

I just don't understand, it seems ReplicatorSubscriptionController only snapshots ledger.lastPosition, why do we update lastDataMessagePublishedTimestamp after maxReadPosition moved?

dao-jun avatar Apr 10 '24 03:04 dao-jun

Please check the typo "maker", should be "marker"

fixed, thanks.

thetumbled avatar Apr 10 '24 03:04 thetumbled

it looks if transactionCoordinatorEnabled=false and publish normal msgs only to the topic, lastDataMessagePublishedTimestamp will not update.

Thanks for review, i have updated the code, PTAL.

thetumbled avatar Apr 10 '24 07:04 thetumbled

I just don't understand, it seems ReplicatorSubscriptionController only snapshots ledger.lastPosition, why do we update lastDataMessagePublishedTimestamp after maxReadPosition moved?

We may need to have a discussion about which time point should we update lastDataMessagePublishedTimestamp. There are two options:

  • update lastDataMessagePublishedTimestamp whenever a transactional/non-transactional message is persisted.
  • update lastDataMessagePublishedTimestamp when the max position move forward.

The difference of these two options is that the frequency of the former is greater than the latter. With greater frequency, we have a more precice position for ReplicatorSubscription. But because the consumer can't consume the messages after the max read position, and the number of snapshot is limited(default 10), there are high risk that when a consumer ack a position belonging to a committed transaction, the corresponding snapshots are already evicted(the frequency of snapshot is very fast, a transaction last for more than 10s will meet such case in default configuration), thus the ReplicatorSubscription feature can't work at all. Looking for your reply. @lhotari @codelipenghui @liangyepianzhou @dao-jun @Technoboy-

thetumbled avatar Apr 10 '24 08:04 thetumbled

I just don't understand, it seems ReplicatorSubscriptionController only snapshots ledger.lastPosition, why do we update lastDataMessagePublishedTimestamp after maxReadPosition moved?

We may need to have a discussion about which time point should we update lastDataMessagePublishedTimestamp. There are two options:

  • update lastDataMessagePublishedTimestamp whenever a transactional/non-transactional message is persisted.
  • update lastDataMessagePublishedTimestamp when the max position move forward.

The difference of these two options is that the frequency of the former is greater than the latter. With greater frequency, we have a more precice position for ReplicatorSubscription. But because the consumer can't consume the messages after the max read position, and the number of snapshot is limited(default 10), there are high risk that when a consumer ack a position belonging to a committed transaction, the corresponding snapshots are already evicted(the frequency of snapshot is very fast, a transaction last for more than 10s will meet such case in default configuration), thus the ReplicatorSubscription feature can't work at all. Looking for your reply. @lhotari @codelipenghui @liangyepianzhou @dao-jun @Technoboy-

+1 with option 1

dao-jun avatar Apr 10 '24 08:04 dao-jun

  • update lastDataMessagePublishedTimestamp when the max position move forward.

this seems to make sense. assuming that "when the max position moves forward", there are more messages available to be read. It might make sense to rename lastDataMessagePublishedTimestamp to cover both the non-tranactional and tranactional case. The field name should also include the purpose, this is for replicated subscriptions. It's also possible to cover the meaning in a code comment for the field.

lhotari avatar Apr 10 '24 09:04 lhotari

  • update lastDataMessagePublishedTimestamp when the max position move forward.

this seems to make sense. assuming that "when the max position moves forward", there are more messages available to be read. It might make sense to rename lastDataMessagePublishedTimestamp to cover both the non-tranactional and tranactional case. The field name should also include the purpose, this is for replicated subscriptions. It's also possible to cover the meaning in a code comment for the field.

good idea.

thetumbled avatar Apr 11 '24 02:04 thetumbled

PTAL, thanks. @lhotari @codelipenghui @liangyepianzhou @dao-jun @Technoboy-

thetumbled avatar Apr 12 '24 01:04 thetumbled

closing and reopening to pick up recent changes from master branch

lhotari avatar Apr 30 '24 10:04 lhotari

/pulsarbot rerun-failure-checks

thetumbled avatar Apr 30 '24 15:04 thetumbled

Closing and reopening to get recent changes from master branch to the PR build.

lhotari avatar May 03 '24 18:05 lhotari

Codecov Report

Attention: Patch coverage is 79.41176% with 7 lines in your changes are missing coverage. Please review.

Project coverage is 73.13%. Comparing base (bbc6224) to head (2f416f0). Report is 254 commits behind head on master.

Additional details and impacted files

Impacted file tree graph

@@             Coverage Diff              @@
##             master   #22452      +/-   ##
============================================
- Coverage     73.57%   73.13%   -0.45%     
- Complexity    32624    32791     +167     
============================================
  Files          1877     1888      +11     
  Lines        139502   141132    +1630     
  Branches      15299    15490     +191     
============================================
+ Hits         102638   103216     +578     
- Misses        28908    29940    +1032     
- Partials       7956     7976      +20     
Flag Coverage Δ
inttests 27.32% <58.82%> (+2.73%) :arrow_up:
systests 24.77% <20.58%> (+0.45%) :arrow_up:
unittests 72.13% <79.41%> (-0.72%) :arrow_down:

Flags with carried forward coverage won't be shown. Click here to find out more.

Files Coverage Δ
...sar/broker/service/persistent/PersistentTopic.java 79.10% <100.00%> (+0.64%) :arrow_up:
.../persistent/ReplicatedSubscriptionsController.java 70.37% <100.00%> (-2.23%) :arrow_down:
...ransaction/buffer/impl/TopicTransactionBuffer.java 88.05% <100.00%> (+0.30%) :arrow_up:
...nsaction/buffer/impl/TransactionBufferDisable.java 53.57% <40.00%> (-2.96%) :arrow_down:
...ransaction/buffer/impl/InMemTransactionBuffer.java 56.47% <20.00%> (-1.11%) :arrow_down:

... and 336 files with indirect coverage changes

codecov-commenter avatar May 03 '24 19:05 codecov-commenter

/pulsarbot rerun-failure-checks

thetumbled avatar May 04 '24 04:05 thetumbled

/pulsarbot rerun-failure-checks

thetumbled avatar May 05 '24 02:05 thetumbled

image The test keeps failing, could you please resolve it?

dao-jun avatar May 05 '24 04:05 dao-jun

image The test keeps failing, could you please resolve it?

I don't meet this error. image

thetumbled avatar May 05 '24 04:05 thetumbled

Closing and reopening to get recent changes from master branch to the PR build.

dao-jun avatar May 05 '24 16:05 dao-jun

/pulsarbot rerun-failure-checks

thetumbled avatar May 06 '24 06:05 thetumbled

@poorbarcode any more change requests?

dao-jun avatar May 08 '24 05:05 dao-jun

/pulsarbot rerun-failure-checks

thetumbled avatar May 09 '24 03:05 thetumbled

/pulsarbot rerun-failure-checks

thetumbled avatar May 09 '24 06:05 thetumbled

Error: org.apache.pulsar.broker.service.ReplicatorSubscriptionWithTransactionTest.testReplicatedSubscriptionWhenReplicatorProducerIsClosed Time elapsed: 12.567 s <<< FAILURE!
org.awaitility.core.ConditionTimeoutException: Assertion condition defined as a org.apache.pulsar.broker.service.ReplicatorSubscriptionTest expected object to not be null within 10 seconds.

Pulsar CI / CI - Unit - Brokers - Broker Group 1 (pull_request) test is not stable, please check the reason

congbobo184 avatar May 09 '24 11:05 congbobo184

Error: org.apache.pulsar.broker.service.ReplicatorSubscriptionWithTransactionTest.testReplicatedSubscriptionWhenReplicatorProducerIsClosed Time elapsed: 12.567 s <<< FAILURE!
org.awaitility.core.ConditionTimeoutException: Assertion condition defined as a org.apache.pulsar.broker.service.ReplicatorSubscriptionTest expected object to not be null within 10 seconds.

Pulsar CI / CI - Unit - Brokers - Broker Group 1 (pull_request) test is not stable, please check the reason

I have fixed the test code, please help to trigger the CI again, thanks.

thetumbled avatar May 09 '24 12:05 thetumbled

/pulsarbot rerun-failure-checks

thetumbled avatar May 10 '24 01:05 thetumbled

/pulsarbot rerun-failure-checks

thetumbled avatar May 10 '24 02:05 thetumbled