pulsar icon indicating copy to clipboard operation
pulsar copied to clipboard

[fix][broker] Fix cannot cleanup expired ledger by trim ledgers

Open Nicklee007 opened this issue 2 years ago • 17 comments

Fixes #15473

Motivation

The pr #10087 has fixed some case expired data cannot cleanup, but in some other case the bug reappeared .

case 1 : the slowestReaderPosition be reset by other cursor operator before we can trim the expired ledger.

The topic producer produced some data and stoped in a time, then no more message will be add. When the maximumRolloverTimeMs reached, check the current ledger is full , closed the old ledger and created new ledger, after ledger create complete maybeUpdateCursorBeforeTrimmingConsumedLedger() will update cursor slowestReaderPosition to (newLedgerId,-1) and trigger trimConsumedLedgersInBackground() immediately https://github.com/apache/pulsar/blob/a1fb200ff707e9855efb563a27a894664a59c58b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java#L2284 https://github.com/apache/pulsar/blob/a1fb200ff707e9855efb563a27a894664a59c58b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java#L2295-L2296

But almost the message retentionTimeMs is longer than maximumRolloverTimeMs, then the ledger can not be clean by trimConsumedLedgersInBackground at this time running. And then if we have a flink task consumer the topic( use reader to consumer and mark position by another durable cursor), the client will always reset the durable cursor to the position (olderLedgerId, entryNum +1) internalResetCursor invoke internalAsyncMarkDelete invoke ledger.updateCursor to reset slowestReaderPosition to (olderLedgerId, entryNum ), when reached the next retentionCheckIntervalInSeconds to invoke trimConsumedLedgersInBackground ,but the slowestReaderPosition has been reset to the (olderLedgerId, entryNum ), then we can not clean the old ledger in any reach 'retentionCheckIntervalInSeconds' time. https://github.com/apache/pulsar/blob/ac6bd3c71e24b90826438cd44395ca21a849067f/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java#L2474

Also the old ledger is not expired when after created ledger trigger trimConsumedLedgersInBackground ,another possibility cursor operator is the markDeleteLimiter cause the consumer last mark-delete operations is Dirty and need mark delete by flushCursorsTask also will invokeledger.updateCursor(ManagedCursorImpl.this, mdEntry.newPosition) to (olderLedgerId, entryNum ) . After this time we also cloud not clean the old ledger by trimConsumedLedgersInBackground. https://github.com/apache/pulsar/blob/ac6bd3c71e24b90826438cd44395ca21a849067f/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java#L3132-L3138

case2: have not a condition to set slowestReaderPosition to the correct position.

In this case, the most common is the producer stoped in one time and we have durable cursor, but we have not active consumer, the cursor position move forward by MessageExpiryMonitor,maybe the cursor mark delete position is before a few ledger , and the last time create ledger and invoke maybeUpdateCursorBeforeTrimmingConsumedLedger maybe not set slowestReaderPosition to the newest ledger (newLedgerId,-1) , after then MessageExpiryMonitor only can reset slowestReaderPosition to(olderLedgerId, entryNum ) . After this time we also cloud not clean the old ledger by trimConsumedLedgersInBackground.

Modifications

  1. when invoke checkConsumedLedgers , we use maybeUpdateCursorBeforeTrimmingConsumedLedger() to check if ledger consumed completely and reset cursor to next ledger, then the ConsumedLedgersMonitor can trim the old ledger data.
  2. PR #14672 change rollCurrentLedgerIfFull logic to follow lazy creation of ledger, we need create new ledger to reset slowestReaderPosition to the new empty ledger. So we need add invoke createLedgerAfterClosed() in rollCurrentLedgerIfFull. -- PR #14672 has revert.
  3. add some unit test.

Documentation

Check the box below or label this PR directly. Need to update docs?

  • [X] doc-not-needed (Please explain why)

Nicklee007 avatar May 06 '22 15:05 Nicklee007

Good catch. This patch solved the problem to some extent, I think the solution is that the newly created persistent subscription 'cursor position' is always greater than 'message-TTL' and the 'message-TTL' policy is enabled.

I have two questions:

  • 1

And then if we have a flink task consumer the topic( use reader to consumer and mark position by another durable cursor), the client will always reset the durable cursor to the position.

why not use nodurable Reader ?

  • 2
ledgerClosed(lh);
createLedgerAfterClosed();

This should ensure that a new-leadger is always created when the old-leadger is closed. If this is necessary, a comment should be added to remind others later, better have a unit test to ensure that.

poorbarcode avatar May 07 '22 01:05 poorbarcode

Good catch. This patch solved the problem to some extent, I think the solution is that the newly created persistent subscription 'cursor position' is always greater than 'message-TTL' and the 'message-TTL' policy is enabled.

I have two questions:

  • 1

And then if we have a flink task consumer the topic( use reader to consumer and mark position by another durable cursor), the client will always reset the durable cursor to the position.

why not use nodurable Reader ?

  • 2
ledgerClosed(lh);
createLedgerAfterClosed();

This should ensure that a new-leadger is always created when the old-leadger is closed. If this is necessary, a comment should be added to remind others later, better have a unit test to ensure that.

@poorbarcode yes, but only enable 'message-TTL' policy is not enough. As the case 2, if we have subscription cursor and not active consumer, the MessageExpiryMonitor only can reset slowestReaderPosition to(olderLedgerId, entryNum ), cannot move the cursor to the newest empty ledger, we move slowestReaderPosition through the ConsumedLedgersMonitor to check if the ledger consumed completely, we'll reset slowestReaderPosition and then trigger trimConsumedLedgersInBackground in one task , this will be more effective and probability of slowestReaderPosition value being covered is lower, even slowestReaderPosition is covered ,we could reset it to newest position in next MessageExpiryMonitor time.

why not use nodurable Reader ? Because we have many flink job use the StreamNative's pulsar-connector which use a reader to consume data and need mark consumer position by another durable cursor when flink job restart which will be used. So there are two cursor.

Nicklee007 avatar May 07 '22 03:05 Nicklee007

@mattisonchao For clean expire ledger data, and the loaded topic need at least one ledger, this PR changes the behavior of rollCurrentLedgerIfFull, readd createLedgerAfterClosed() in rollCurrentLedgerIfFull I need your review. I'll change the unit test later.

Nicklee007 avatar May 07 '22 03:05 Nicklee007

Hi @Nicklee007 Could you write a test to reproduce this case?

Technoboy- avatar May 10 '22 12:05 Technoboy-

Hi @Nicklee007 Could you write a test to reproduce this case?

@Technoboy- added unit test for the case 2, PTAL.

Nicklee007 avatar May 16 '22 10:05 Nicklee007

@mattisonchao For clean expire ledger data, and the loaded topic need at least one ledger, this PR changes the behavior of rollCurrentLedgerIfFull, readd createLedgerAfterClosed() in rollCurrentLedgerIfFull I need your review. I'll change the unit test later.

@mattisonchao hi, I have revert the unit test modify in the PR 14672, PTAL.

Nicklee007 avatar May 17 '22 06:05 Nicklee007

The pr had no activity for 30 days, mark with Stale label.

github-actions[bot] avatar Jun 17 '22 02:06 github-actions[bot]

/pulsarbot run-failure-checks

Nicklee007 avatar Jul 20 '22 14:07 Nicklee007

@codelipenghui @Jason918 @eolivelli Could you help to review this PR, Thx.

Nicklee007 avatar Jul 25 '22 15:07 Nicklee007

Hi, @Nicklee007

Because we create the ledger lazily in the writing messages process, the trim also can not trim the current ledger. In this case, if we don't have any write operation, we will also cause this problem?

mattisonchao avatar Jul 26 '22 01:07 mattisonchao

Because we create the ledger lazily in the writing messages process, the trim also can not trim the current ledger. In this case, if we don't have any write operation, we will also cause this problem?

@mattisonchao Looks like will also cause this problem in the writing messages process, when the last msg added let the ledger full to trigger close ledger and there are no more new msg be produced to the topic. But this situation is more extreme and has small probability to meet. Maybe we can fix the more common scene firstly.

Nicklee007 avatar Jul 26 '22 04:07 Nicklee007

I think we still need to revert #14672 first (#16806). Because this PR just checks whether any cursor has been updated in the the periodically BrokerService#checkConsumedLedgers call based on #16806, which restores the original behavior (same with branch-2.10 and earlier) that a new ledger will be created after closing the current ledger.

BewareMyPower avatar Jul 29 '22 09:07 BewareMyPower

I think we still need to revert #14672 first (#16806). Because this PR just checks whether any cursor has been updated in the the periodically BrokerService#checkConsumedLedgers call based on #16806, which restores the original behavior (same with branch-2.10 and earlier) that a new ledger will be created after closing the current ledger.

Ok, waiting #14672 reverted, I'll change this PR.

Nicklee007 avatar Jul 29 '22 10:07 Nicklee007

Hi, @Nicklee007 The PR #14672 already reverted. please continue work. thanks a lot!

mattisonchao avatar Aug 04 '22 07:08 mattisonchao

@mattisonchao Changed this PR, PTAL.

Nicklee007 avatar Aug 05 '22 06:08 Nicklee007

@Nicklee007 Please provide a correct documentation label for your PR. Instructions see Pulsar Documentation Label Guide.

github-actions[bot] avatar Aug 05 '22 06:08 github-actions[bot]

The pr had no activity for 30 days, mark with Stale label.

github-actions[bot] avatar Sep 24 '22 02:09 github-actions[bot]