pulsar icon indicating copy to clipboard operation
pulsar copied to clipboard

[fix] Fix Reader can be stuck from transaction aborted messages.

Open dao-jun opened this issue 9 months ago • 9 comments

Motivation

Fix Reader can be stuck from transaction aborted messages. Related to https://github.com/apache/pulsar/pull/22572

Since Reader's SubscriptionType is Exclusive, so we no need to handle DelayedDelivery messages.

Modifications

Verifying this change

  • [ ] Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • [ ] Dependencies (add or upgrade a dependency)
  • [ ] The public API
  • [ ] The schema
  • [ ] The default values of configurations
  • [ ] The threading model
  • [ ] The binary protocol
  • [ ] The REST endpoints
  • [ ] The admin CLI options
  • [ ] The metrics
  • [ ] Anything that affects deployment

Documentation

  • [ ] doc
  • [ ] doc-required
  • [x] doc-not-needed
  • [ ] doc-complete

Matching PR in forked repository

PR in forked repository:

dao-jun avatar Apr 28 '24 14:04 dao-jun

hi, @tjiuming Thanks for your PR.

Maybe we can change this test to cover all related transactions get lastmessage id case.

https://github.com/apache/pulsar/blob/a761b97b733142b1ade525e1d1c06785e98face1/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java#L264

Change to:

@Test
    public void testGetLastMessageIdsWithOngoingTransactions() throws Exception {
        // 1. Prepare environment
        String topic = "persistent://" + NAMESPACE1 + "/testGetLastMessageIdsWithOngoingTransactions";
        String subName = "my-subscription";
        @Cleanup
        Producer<byte[]> producer = pulsarClient.newProducer()
                .topic(topic)
                .create();
        Consumer<byte[]> consumer = pulsarClient.newConsumer()
                .topic(topic)
                .subscriptionName(subName)
                .subscribe();

        // 2. Test last max read position can be required correctly.
        // 2.1 Case1: send 3 original messages. |1:0|1:1|1:2|
        MessageIdImpl expectedLastMessageID = null;
        for (int i = 0; i < 3; i++) {
            expectedLastMessageID = (MessageIdImpl) producer.newMessage().send();
        }
        assertMessageId(consumer, expectedLastMessageID);
        // 2.2 Case2: send 2 ongoing transactional messages and 2 original messages.
        // |1:0|1:1|1:2|txn1:start->1:3|1:4|txn2:start->1:5.
        Transaction txn1 = pulsarClient.newTransaction()
                .withTransactionTimeout(5, TimeUnit.HOURS)
                .build()
                .get();
        Transaction txn2 = pulsarClient.newTransaction()
                .withTransactionTimeout(5, TimeUnit.HOURS)
                .build()
                .get();

        // |1:0|1:1|1:2|txn1:1:3|
        producer.newMessage(txn1).send();
        
        // |1:0|1:1|1:2|txn1:1:3|1:4|
        MessageIdImpl expectedLastMessageID1 = (MessageIdImpl) producer.newMessage().send();

        // |1:0|1:1|1:2|txn1:1:3|1:4|txn2:1:5|
        producer.newMessage(txn2).send();
        
        // 2.2.1 Last message ID will not change when txn1 and txn2 do not end.
        assertMessageId(consumer, expectedLastMessageID);

        // 2.2.2 Last message ID will update to 1:4 when txn1 committed.
        // |1:0|1:1|1:2|txn1:1:3|1:4|txn2:1:5|tx1:commit->1:6|
        txn1.commit().get(5, TimeUnit.SECONDS);
        assertMessageId(consumer, expectedLastMessageID1);

        // 2.2.3 Last message ID will still to 1:4 when txn2 aborted.
        // |1:0|1:1|1:2|txn1:1:3|1:4|txn2:1:5|tx1:commit->1:6|tx2:abort->1:7|
        txn2.abort().get(5, TimeUnit.SECONDS);
        assertMessageId(consumer, expectedLastMessageID1);
    }

shibd avatar Apr 29 '24 01:04 shibd

@shibd Hi baodi, I've addressed your comment, thanks!

dao-jun avatar Apr 29 '24 01:04 dao-jun

It seems we don't need maxReadPosition.compareTo((PositionImpl) getLastPosition()) != 0 judgment condition, since maxReadPosition also may not be a valid position for transaction topic event if maxReadPosition < lastPosition.

I don't understand, why?

dao-jun avatar Apr 29 '24 04:04 dao-jun

It seems we don't need maxReadPosition.compareTo((PositionImpl) getLastPosition()) != 0 judgment condition, since maxReadPosition also may not be a valid position for transaction topic event if maxReadPosition < lastPosition.

I don't understand, why?

@dao-jun maxReadPosition is just the position in front of the ongoing transaction. It can only ensure that this is not a pending transaction position. but it's not necessarily a normal message.

See: https://github.com/apache/pulsar/blob/264722f1da9ab806c9a79196c091bfe4d03b3090/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java#L447-L459

coderzc avatar Apr 29 '24 04:04 coderzc

@coderzc Oh, I understand you, if the maxReadPosition is also an aborted message, the reader can be stuck too. It makes sense, I'll improve test to cover the case.

dao-jun avatar Apr 29 '24 04:04 dao-jun

@coderzc PTAL

dao-jun avatar Apr 29 '24 04:04 dao-jun

@codelipenghui @shibd @coderzc It looks like Shared/key_Shared Consumer also could be stuck from delayed delivery messages, WDYT? Do we need to handle it?

dao-jun avatar Apr 29 '24 06:04 dao-jun

@codelipenghui @shibd @coderzc It looks like Shared/key_Shared Consumer also could be stuck from delayed delivery messages, WDYT? Do we need to handle it?

I prefer case by case fix it and adding enough unit tests to cover every case.

shibd avatar Apr 29 '24 11:04 shibd

Codecov Report

Attention: Patch coverage is 90.00000% with 1 lines in your changes are missing coverage. Please review.

Project coverage is 72.71%. Comparing base (bbc6224) to head (59d227f). Report is 218 commits behind head on master.

Additional details and impacted files

Impacted file tree graph

@@             Coverage Diff              @@
##             master   #22610      +/-   ##
============================================
- Coverage     73.57%   72.71%   -0.86%     
+ Complexity    32624    32532      -92     
============================================
  Files          1877     1887      +10     
  Lines        139502   141004    +1502     
  Branches      15299    15477     +178     
============================================
- Hits         102638   102534     -104     
- Misses        28908    30603    +1695     
+ Partials       7956     7867      -89     
Flag Coverage Δ
inttests 27.40% <40.00%> (+2.82%) :arrow_up:
systests 24.84% <40.00%> (+0.51%) :arrow_up:
unittests 71.46% <90.00%> (-1.39%) :arrow_down:

Flags with carried forward coverage won't be shown. Click here to find out more.

Files Coverage Δ
...ookkeeper/mledger/util/ManagedLedgerImplUtils.java 68.18% <100.00%> (ø)
...sar/broker/service/persistent/PersistentTopic.java 79.06% <88.88%> (+0.60%) :arrow_up:

... and 342 files with indirect coverage changes

codecov-commenter avatar May 03 '24 00:05 codecov-commenter

Regarding #22572 (comment), to further reduce code duplication, start internalAsyncReverseFindPositionOneByOne method with this part

    if (!ledger.isValidPosition(previousPosition)) {
        future.complete(previousPosition);
        return;
    }

and then this logic can be removed from asyncGetLastValidPosition and the readEntryComplete callback.

addressed

dao-jun avatar May 05 '24 04:05 dao-jun

@lhotari PTAL

dao-jun avatar May 06 '24 10:05 dao-jun