pulsar
pulsar copied to clipboard
[fix][broker] Fix cannot cleanup expired ledger by trim ledgers
Fixes #15473
Motivation
The pr #10087 has fixed some case expired data cannot cleanup, but in some other case the bug reappeared .
case 1 : the slowestReaderPosition
be reset by other cursor operator before we can trim the expired ledger.
The topic producer produced some data and stoped in a time, then no more message will be add. When the maximumRolloverTimeMs
reached, check the current ledger is full , closed the old ledger and created new ledger, after ledger create complete maybeUpdateCursorBeforeTrimmingConsumedLedger()
will update cursor slowestReaderPosition
to (newLedgerId,-1)
and trigger trimConsumedLedgersInBackground()
immediately
https://github.com/apache/pulsar/blob/a1fb200ff707e9855efb563a27a894664a59c58b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java#L2284
https://github.com/apache/pulsar/blob/a1fb200ff707e9855efb563a27a894664a59c58b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java#L2295-L2296
But almost the message retentionTimeMs
is longer than maximumRolloverTimeMs
, then the ledger can not be clean by trimConsumedLedgersInBackground
at this time running.
And then if we have a flink task consumer the topic( use reader to consumer and mark position by another durable cursor), the client will always reset the durable cursor to the position (olderLedgerId, entryNum +1)
internalResetCursor
invoke internalAsyncMarkDelete
invoke ledger.updateCursor
to reset slowestReaderPosition
to (olderLedgerId, entryNum )
, when reached the next retentionCheckIntervalInSeconds
to invoke trimConsumedLedgersInBackground
,but the slowestReaderPosition
has been reset to the (olderLedgerId, entryNum )
, then we can not clean the old ledger in any reach 'retentionCheckIntervalInSeconds' time.
https://github.com/apache/pulsar/blob/ac6bd3c71e24b90826438cd44395ca21a849067f/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java#L2474
Also the old ledger is not expired when after created ledger trigger trimConsumedLedgersInBackground
,another possibility cursor operator is the markDeleteLimiter
cause the consumer last mark-delete operations is Dirty and need mark delete by flushCursorsTask
also will invokeledger.updateCursor(ManagedCursorImpl.this, mdEntry.newPosition)
to (olderLedgerId, entryNum )
. After this time we also cloud not clean the old ledger by trimConsumedLedgersInBackground
.
https://github.com/apache/pulsar/blob/ac6bd3c71e24b90826438cd44395ca21a849067f/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java#L3132-L3138
case2: have not a condition to set slowestReaderPosition to the correct position.
In this case, the most common is the producer stoped in one time and we have durable cursor, but we have not active consumer, the cursor position move forward by MessageExpiryMonitor
,maybe the cursor mark delete position is before a few ledger , and the last time create ledger and invoke maybeUpdateCursorBeforeTrimmingConsumedLedger
maybe not set slowestReaderPosition
to the newest ledger (newLedgerId,-1)
, after then MessageExpiryMonitor
only can reset slowestReaderPosition
to(olderLedgerId, entryNum )
. After this time we also cloud not clean the old ledger by trimConsumedLedgersInBackground
.
Modifications
- when invoke
checkConsumedLedgers
, we usemaybeUpdateCursorBeforeTrimmingConsumedLedger()
to check if ledger consumed completely and reset cursor to next ledger, then theConsumedLedgersMonitor
can trim the old ledger data. - PR #14672 change
rollCurrentLedgerIfFull
logic to follow lazy creation of ledger, we need create new ledger to resetslowestReaderPosition
to the new empty ledger. So we need add invokecreateLedgerAfterClosed()
inrollCurrentLedgerIfFull
. -- PR #14672 has revert. - add some unit test.
Documentation
Check the box below or label this PR directly. Need to update docs?
- [X]
doc-not-needed
(Please explain why)
Good catch. This patch solved the problem to some extent, I think the solution is that the newly created persistent subscription 'cursor position' is always greater than 'message-TTL' and the 'message-TTL' policy is enabled.
I have two questions:
- 1
And then if we have a flink task consumer the topic( use reader to consumer and mark position by another durable cursor), the client will always reset the durable cursor to the position.
why not use nodurable Reader ?
- 2
ledgerClosed(lh);
createLedgerAfterClosed();
This should ensure that a new-leadger is always created when the old-leadger is closed. If this is necessary, a comment should be added to remind others later, better have a unit test to ensure that.
Good catch. This patch solved the problem to some extent, I think the solution is that the newly created persistent subscription 'cursor position' is always greater than 'message-TTL' and the 'message-TTL' policy is enabled.
I have two questions:
- 1
And then if we have a flink task consumer the topic( use reader to consumer and mark position by another durable cursor), the client will always reset the durable cursor to the position.
why not use nodurable Reader ?
- 2
ledgerClosed(lh); createLedgerAfterClosed();
This should ensure that a new-leadger is always created when the old-leadger is closed. If this is necessary, a comment should be added to remind others later, better have a unit test to ensure that.
@poorbarcode yes, but only enable 'message-TTL' policy is not enough. As the case 2, if we have subscription cursor and not active consumer, the MessageExpiryMonitor
only can reset slowestReaderPosition
to(olderLedgerId, entryNum ), cannot move the cursor to the newest empty ledger, we move slowestReaderPosition through the ConsumedLedgersMonitor
to check if the ledger consumed completely, we'll reset slowestReaderPosition and then trigger trimConsumedLedgersInBackground
in one task , this will be more effective and probability of slowestReaderPosition
value being covered is lower, even slowestReaderPosition
is covered ,we could reset it to newest position in next MessageExpiryMonitor
time.
why not use nodurable Reader ? Because we have many flink job use the StreamNative's pulsar-connector which use a reader to consume data and need mark consumer position by another durable cursor when flink job restart which will be used. So there are two cursor.
@mattisonchao For clean expire ledger data, and the loaded topic need at least one ledger, this PR changes the behavior of rollCurrentLedgerIfFull, readd createLedgerAfterClosed() in rollCurrentLedgerIfFull I need your review. I'll change the unit test later.
Hi @Nicklee007 Could you write a test to reproduce this case?
Hi @Nicklee007 Could you write a test to reproduce this case?
@Technoboy- added unit test for the case 2, PTAL.
@mattisonchao For clean expire ledger data, and the loaded topic need at least one ledger, this PR changes the behavior of rollCurrentLedgerIfFull, readd createLedgerAfterClosed() in rollCurrentLedgerIfFull I need your review. I'll change the unit test later.
@mattisonchao hi, I have revert the unit test modify in the PR 14672, PTAL.
The pr had no activity for 30 days, mark with Stale label.
/pulsarbot run-failure-checks
@codelipenghui @Jason918 @eolivelli Could you help to review this PR, Thx.
Hi, @Nicklee007
Because we create the ledger lazily in the writing messages process, the trim also can not trim the current ledger. In this case, if we don't have any write operation, we will also cause this problem?
Because we create the ledger lazily in the writing messages process, the trim also can not trim the current ledger. In this case, if we don't have any write operation, we will also cause this problem?
@mattisonchao Looks like will also cause this problem in the writing messages process, when the last msg added let the ledger full to trigger close ledger and there are no more new msg be produced to the topic. But this situation is more extreme and has small probability to meet. Maybe we can fix the more common scene firstly.
I think we still need to revert #14672 first (#16806). Because this PR just checks whether any cursor has been updated in the the periodically BrokerService#checkConsumedLedgers
call based on #16806, which restores the original behavior (same with branch-2.10 and earlier) that a new ledger will be created after closing the current ledger.
I think we still need to revert #14672 first (#16806). Because this PR just checks whether any cursor has been updated in the the periodically
BrokerService#checkConsumedLedgers
call based on #16806, which restores the original behavior (same with branch-2.10 and earlier) that a new ledger will be created after closing the current ledger.
Ok, waiting #14672 reverted, I'll change this PR.
Hi, @Nicklee007 The PR #14672 already reverted. please continue work. thanks a lot!
@mattisonchao Changed this PR, PTAL.
@Nicklee007 Please provide a correct documentation label for your PR. Instructions see Pulsar Documentation Label Guide.
The pr had no activity for 30 days, mark with Stale label.