bookkeeper icon indicating copy to clipboard operation
bookkeeper copied to clipboard

Fix data lost when configured multiple ledger directories

Open hangc0276 opened this issue 2 years ago • 16 comments

Motivation

We found one place where the bookie may lose data even though we turn on fsync for the journal. Condition:

  • One journal disk, and turn on fsync for the journal
  • Configure two ledger disks, ledger1, and ledger2

Assume we write 100MB data into one bookie, 70MB data written into ledger1's write cache, and 30 MB data written into ledger2's write cache. Ledger1's write cache is full and triggers flush. In flushing the write cache, it will trigger a checkpoint to mark the journal’s lastMark position (100MB’s offset) and write the lastMark position into both ledger1 and ledger2's lastMark file.

At this time, this bookie shutdown without flush write cache, such as shutdown by kill -9 command, and ledger2's write cache (30MB) doesn’t flush into ledger disk. But ledger2's lastMark position which persisted into lastMark file has been updated to 100MB’s offset.

When the bookie starts up, the journal reply position will be min(ledger1's lastMark, ledger2's lastMark), and it will be 100MB’s offset. The ledger2's 30MB data won’t reply and that data will be lost.

Discussion thread: https://lists.apache.org/thread/zz5vvv2yd80vqy22fv8wg5s2lqtkrzh9

Changes

  1. Add checkpoint with specific ledgerDirManager support. When checkpoint triggered by specific ledgerDirManager, we only write lastMark into specific ledgerDirs
  2. When bookie startup, read the minimal lastMark instead of the maximal lastMark as current last mark.
  3. I will add a test soon.

hangc0276 avatar Jun 12 '22 15:06 hangc0276

@hangc0276 I found when syncThread do flush, there exists duplicate checkpointComplete invoke.

SyncThread flush:

private void flush() {
        Checkpoint checkpoint = checkpointSource.newCheckpoint();
        try {
            ledgerStorage.flush();
        } catch (NoWritableLedgerDirException e) {
            log.error("No writeable ledger directories", e);
            dirsListener.allDisksFull(true);
            return;
        } catch (IOException e) {
            log.error("Exception flushing ledgers", e);
            return;
        }

        if (disableCheckpoint) {
            return;
        }

        log.info("Flush ledger storage at checkpoint {}.", checkpoint);
        try {
            checkpointSource.checkpointComplete(null, checkpoint, false);
        } catch (IOException e) {
            log.error("Exception marking checkpoint as complete", e);
            dirsListener.allDisksFull(true);
        }
    }
@Override
    public void flush() throws IOException {
        Checkpoint cp = checkpointSource.newCheckpoint();
        checkpoint(cp);
        checkpointSource.checkpointComplete(ledgerDir, cp, true);
    }

gaozhangmin avatar Jun 13 '22 06:06 gaozhangmin

2.When bookie startup, read the minimal lastMark instead of the maximal lastMark as current last mark. @hangc0276 How is this logic implemented? Do you need to modify the following logic?: image

lordcheng10 avatar Jun 14 '22 03:06 lordcheng10

@hangc0276 I found when syncThread do flush, there exists duplicate checkpointComplete invoke.

SyncThread flush:

private void flush() {
        Checkpoint checkpoint = checkpointSource.newCheckpoint();
        try {
            ledgerStorage.flush();
        } catch (NoWritableLedgerDirException e) {
            log.error("No writeable ledger directories", e);
            dirsListener.allDisksFull(true);
            return;
        } catch (IOException e) {
            log.error("Exception flushing ledgers", e);
            return;
        }

        if (disableCheckpoint) {
            return;
        }

        log.info("Flush ledger storage at checkpoint {}.", checkpoint);
        try {
            checkpointSource.checkpointComplete(null, checkpoint, false);
        } catch (IOException e) {
            log.error("Exception marking checkpoint as complete", e);
            dirsListener.allDisksFull(true);
        }
    }
@Override
    public void flush() throws IOException {
        Checkpoint cp = checkpointSource.newCheckpoint();
        checkpoint(cp);
        checkpointSource.checkpointComplete(ledgerDir, cp, true);
    }

I also think the checkpoint is duplicated here

lordcheng10 avatar Jun 14 '22 03:06 lordcheng10

2.When bookie startup, read the minimal lastMark instead of the maximal lastMark as current last mark. @hangc0276 How is this logic implemented? Do you need to modify the following logic?:

image

+1, changes should also be made at Journal.readLog

gaozhangmin avatar Jun 15 '22 02:06 gaozhangmin

it's better to add proper test case to avoid future regressions

@nicoloboschi I have added the test to cover this change, please help take a look, thanks.

hangc0276 avatar Jun 18 '22 11:06 hangc0276

@merlimat @eolivelli @nicoloboschi @gaozhangmin @lordcheng10 I have updated the code and added the test to cover this change, Please help take a look, thanks.

hangc0276 avatar Jun 18 '22 11:06 hangc0276

rerun failure checks

hangc0276 avatar Jun 20 '22 01:06 hangc0276

rerun failure checks

hangc0276 avatar Jun 20 '22 02:06 hangc0276

rerun failure checks

hangc0276 avatar Jun 20 '22 05:06 hangc0276

@hangc0276 I found when syncThread do flush, there exists duplicate checkpointComplete invoke. SyncThread flush:

private void flush() {
        Checkpoint checkpoint = checkpointSource.newCheckpoint();
        try {
            ledgerStorage.flush();
        } catch (NoWritableLedgerDirException e) {
            log.error("No writeable ledger directories", e);
            dirsListener.allDisksFull(true);
            return;
        } catch (IOException e) {
            log.error("Exception flushing ledgers", e);
            return;
        }

        if (disableCheckpoint) {
            return;
        }

        log.info("Flush ledger storage at checkpoint {}.", checkpoint);
        try {
            checkpointSource.checkpointComplete(null, checkpoint, false);
        } catch (IOException e) {
            log.error("Exception marking checkpoint as complete", e);
            dirsListener.allDisksFull(true);
        }
    }
@Override
    public void flush() throws IOException {
        Checkpoint cp = checkpointSource.newCheckpoint();
        checkpoint(cp);
        checkpointSource.checkpointComplete(ledgerDir, cp, true);
    }

I also think the checkpoint is duplicated here

@hangc0276 This problem is not going to be resolved here, right?

gaozhangmin avatar Jun 21 '22 08:06 gaozhangmin

@hangc0276 I found when syncThread do flush, there exists duplicate checkpointComplete invoke. SyncThread flush:

private void flush() {
        Checkpoint checkpoint = checkpointSource.newCheckpoint();
        try {
            ledgerStorage.flush();
        } catch (NoWritableLedgerDirException e) {
            log.error("No writeable ledger directories", e);
            dirsListener.allDisksFull(true);
            return;
        } catch (IOException e) {
            log.error("Exception flushing ledgers", e);
            return;
        }

        if (disableCheckpoint) {
            return;
        }

        log.info("Flush ledger storage at checkpoint {}.", checkpoint);
        try {
            checkpointSource.checkpointComplete(null, checkpoint, false);
        } catch (IOException e) {
            log.error("Exception marking checkpoint as complete", e);
            dirsListener.allDisksFull(true);
        }
    }
@Override
    public void flush() throws IOException {
        Checkpoint cp = checkpointSource.newCheckpoint();
        checkpoint(cp);
        checkpointSource.checkpointComplete(ledgerDir, cp, true);
    }

I also think the checkpoint is duplicated here

@hangc0276 This problem is not going to be resolved here, right?

@gaozhangmin Yes,we can use another PR to solve it.

hangc0276 avatar Jun 22 '22 07:06 hangc0276

rerun failure checks

hangc0276 avatar Jun 22 '22 07:06 hangc0276

rerun failure checks

hangc0276 avatar Jun 22 '22 15:06 hangc0276

@hangc0276 I found when syncThread do flush, there exists duplicate checkpointComplete invoke. SyncThread flush:

private void flush() {
        Checkpoint checkpoint = checkpointSource.newCheckpoint();
        try {
            ledgerStorage.flush();
        } catch (NoWritableLedgerDirException e) {
            log.error("No writeable ledger directories", e);
            dirsListener.allDisksFull(true);
            return;
        } catch (IOException e) {
            log.error("Exception flushing ledgers", e);
            return;
        }

        if (disableCheckpoint) {
            return;
        }

        log.info("Flush ledger storage at checkpoint {}.", checkpoint);
        try {
            checkpointSource.checkpointComplete(null, checkpoint, false);
        } catch (IOException e) {
            log.error("Exception marking checkpoint as complete", e);
            dirsListener.allDisksFull(true);
        }
    }
@Override
    public void flush() throws IOException {
        Checkpoint cp = checkpointSource.newCheckpoint();
        checkpoint(cp);
        checkpointSource.checkpointComplete(ledgerDir, cp, true);
    }

I also think the checkpoint is duplicated here

@hangc0276 This problem is not going to be resolved here, right?

@gaozhangmin Yes,we can use another PR to solve it.

I submit pr #3353 to solve this issue. @hangc0276 PTAL

gaozhangmin avatar Jun 23 '22 09:06 gaozhangmin

This PR changed the default logMark behaviour, it lead to some unit test failure, I'm working on it.

hangc0276 avatar Jul 26 '22 03:07 hangc0276

fix old workflow,please see #3455 for detail

StevenLuMT avatar Aug 24 '22 08:08 StevenLuMT

There are two places that can trigger checkpoint.

  1. the scheduled tasks in SyncThread.doCheckpoint
  2. the ledgerDir write-cache full and then flush The second way is the root cause of data loss.

If removing the checkpointSource.checkpointComplete logic in flush, then at this point we will not delete journal files.

The scheduled task SyncThread.doCheckpoint will invoke checkpointSource.checkpointComplete and it's safe here to delete journal files since we have already flushed all write-caches for all ledger directories.

WDYT @hangc0276

aloyszhang avatar May 09 '23 09:05 aloyszhang

There are two places that can trigger checkpoint.

  1. the scheduled tasks in SyncThread.doCheckpoint
  2. the ledgerDir write-cache full and then flush The second way is the root cause of data loss.

If removing the checkpointSource.checkpointComplete logic in flush, then at this point we will not delete journal files.

The scheduled task SyncThread.doCheckpoint will invoke checkpointSource.checkpointComplete and it's safe here to delete journal files since we have already flushed all write-caches for all ledger directories.

WDYT @hangc0276

@aloyszhang Thanks for your suggestion. Yes, making the SyncThread.doCheckpoint the only endpoint is the simplest way to solve this problem. I updated the PR description, please help take a look, thanks.

hangc0276 avatar May 15 '23 08:05 hangc0276

@eolivelli @merlimat @dlg99 @zymap I updated the code, and need your eyes for this PR, thanks.

hangc0276 avatar Jun 25 '23 06:06 hangc0276