pulsar
pulsar copied to clipboard
[fix] Fix Reader can be stuck from transaction aborted messages.
Motivation
Fix Reader can be stuck from transaction aborted messages. Related to https://github.com/apache/pulsar/pull/22572
Since Reader's SubscriptionType is Exclusive
, so we no need to handle DelayedDelivery messages.
Modifications
Verifying this change
- [ ] Make sure that the change passes the CI checks.
(Please pick either of the following options)
This change is a trivial rework / code cleanup without any test coverage.
(or)
This change is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
- Added integration tests for end-to-end deployment with large payloads (10MB)
- Extended integration test for recovery after broker failure
Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes
- [ ] Dependencies (add or upgrade a dependency)
- [ ] The public API
- [ ] The schema
- [ ] The default values of configurations
- [ ] The threading model
- [ ] The binary protocol
- [ ] The REST endpoints
- [ ] The admin CLI options
- [ ] The metrics
- [ ] Anything that affects deployment
Documentation
- [ ]
doc
- [ ]
doc-required
- [x]
doc-not-needed
- [ ]
doc-complete
Matching PR in forked repository
PR in forked repository:
hi, @tjiuming Thanks for your PR.
Maybe we can change this test to cover all related transactions get lastmessage id case.
https://github.com/apache/pulsar/blob/a761b97b733142b1ade525e1d1c06785e98face1/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java#L264
Change to:
@Test
public void testGetLastMessageIdsWithOngoingTransactions() throws Exception {
// 1. Prepare environment
String topic = "persistent://" + NAMESPACE1 + "/testGetLastMessageIdsWithOngoingTransactions";
String subName = "my-subscription";
@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.create();
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName(subName)
.subscribe();
// 2. Test last max read position can be required correctly.
// 2.1 Case1: send 3 original messages. |1:0|1:1|1:2|
MessageIdImpl expectedLastMessageID = null;
for (int i = 0; i < 3; i++) {
expectedLastMessageID = (MessageIdImpl) producer.newMessage().send();
}
assertMessageId(consumer, expectedLastMessageID);
// 2.2 Case2: send 2 ongoing transactional messages and 2 original messages.
// |1:0|1:1|1:2|txn1:start->1:3|1:4|txn2:start->1:5.
Transaction txn1 = pulsarClient.newTransaction()
.withTransactionTimeout(5, TimeUnit.HOURS)
.build()
.get();
Transaction txn2 = pulsarClient.newTransaction()
.withTransactionTimeout(5, TimeUnit.HOURS)
.build()
.get();
// |1:0|1:1|1:2|txn1:1:3|
producer.newMessage(txn1).send();
// |1:0|1:1|1:2|txn1:1:3|1:4|
MessageIdImpl expectedLastMessageID1 = (MessageIdImpl) producer.newMessage().send();
// |1:0|1:1|1:2|txn1:1:3|1:4|txn2:1:5|
producer.newMessage(txn2).send();
// 2.2.1 Last message ID will not change when txn1 and txn2 do not end.
assertMessageId(consumer, expectedLastMessageID);
// 2.2.2 Last message ID will update to 1:4 when txn1 committed.
// |1:0|1:1|1:2|txn1:1:3|1:4|txn2:1:5|tx1:commit->1:6|
txn1.commit().get(5, TimeUnit.SECONDS);
assertMessageId(consumer, expectedLastMessageID1);
// 2.2.3 Last message ID will still to 1:4 when txn2 aborted.
// |1:0|1:1|1:2|txn1:1:3|1:4|txn2:1:5|tx1:commit->1:6|tx2:abort->1:7|
txn2.abort().get(5, TimeUnit.SECONDS);
assertMessageId(consumer, expectedLastMessageID1);
}
@shibd Hi baodi, I've addressed your comment, thanks!
It seems we don't need
maxReadPosition.compareTo((PositionImpl) getLastPosition()) != 0
judgment condition, sincemaxReadPosition
also may not be a valid position for transaction topic event ifmaxReadPosition < lastPosition
.
I don't understand, why?
It seems we don't need
maxReadPosition.compareTo((PositionImpl) getLastPosition()) != 0
judgment condition, sincemaxReadPosition
also may not be a valid position for transaction topic event ifmaxReadPosition < lastPosition
.I don't understand, why?
@dao-jun maxReadPosition
is just the position in front of the ongoing transaction. It can only ensure that this is not a pending transaction position. but it's not necessarily a normal message.
See: https://github.com/apache/pulsar/blob/264722f1da9ab806c9a79196c091bfe4d03b3090/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java#L447-L459
@coderzc Oh, I understand you, if the maxReadPosition is also an aborted message, the reader can be stuck too. It makes sense, I'll improve test to cover the case.
@coderzc PTAL
@codelipenghui @shibd @coderzc It looks like Shared/key_Shared Consumer also could be stuck from delayed delivery messages, WDYT? Do we need to handle it?
@codelipenghui @shibd @coderzc It looks like Shared/key_Shared Consumer also could be stuck from delayed delivery messages, WDYT? Do we need to handle it?
I prefer case by case fix it and adding enough unit tests to cover every case.
Codecov Report
Attention: Patch coverage is 90.00000%
with 1 lines
in your changes are missing coverage. Please review.
Project coverage is 72.71%. Comparing base (
bbc6224
) to head (59d227f
). Report is 218 commits behind head on master.
Additional details and impacted files
@@ Coverage Diff @@
## master #22610 +/- ##
============================================
- Coverage 73.57% 72.71% -0.86%
+ Complexity 32624 32532 -92
============================================
Files 1877 1887 +10
Lines 139502 141004 +1502
Branches 15299 15477 +178
============================================
- Hits 102638 102534 -104
- Misses 28908 30603 +1695
+ Partials 7956 7867 -89
Flag | Coverage Δ | |
---|---|---|
inttests | 27.40% <40.00%> (+2.82%) |
:arrow_up: |
systests | 24.84% <40.00%> (+0.51%) |
:arrow_up: |
unittests | 71.46% <90.00%> (-1.39%) |
:arrow_down: |
Flags with carried forward coverage won't be shown. Click here to find out more.
Files | Coverage Δ | |
---|---|---|
...ookkeeper/mledger/util/ManagedLedgerImplUtils.java | 68.18% <100.00%> (ø) |
|
...sar/broker/service/persistent/PersistentTopic.java | 79.06% <88.88%> (+0.60%) |
:arrow_up: |
Regarding #22572 (comment), to further reduce code duplication, start
internalAsyncReverseFindPositionOneByOne
method with this partif (!ledger.isValidPosition(previousPosition)) { future.complete(previousPosition); return; }
and then this logic can be removed from
asyncGetLastValidPosition
and thereadEntryComplete
callback.
addressed
@lhotari PTAL