bookkeeper
bookkeeper copied to clipboard
Fix data lost when configured multiple ledger directories
Motivation
We found one place where the bookie may lose data even though we turn on fsync for the journal. Condition:
- One journal disk, and turn on fsync for the journal
- Configure two ledger disks, ledger1, and ledger2
Assume we write 100MB data into one bookie, 70MB data written into ledger1's write cache, and 30 MB data written into ledger2's write cache. Ledger1's write cache is full and triggers flush. In flushing the write cache, it will trigger a checkpoint to mark the journal’s lastMark position (100MB’s offset) and write the lastMark position into both ledger1 and ledger2's lastMark file.
At this time, this bookie shutdown without flush write cache, such as shutdown by kill -9
command, and ledger2's write cache (30MB) doesn’t flush into ledger disk. But ledger2's lastMark position which persisted into lastMark file has been updated to 100MB’s offset.
When the bookie starts up, the journal reply position will be min(ledger1's lastMark, ledger2's lastMark)
, and it will be 100MB’s offset. The ledger2's 30MB data won’t reply and that data will be lost.
Discussion thread: https://lists.apache.org/thread/zz5vvv2yd80vqy22fv8wg5s2lqtkrzh9
Changes
- Add checkpoint with specific ledgerDirManager support. When checkpoint triggered by specific ledgerDirManager, we only write
lastMark
into specific ledgerDirs - When bookie startup, read the minimal
lastMark
instead of the maximallastMark
as current last mark. - I will add a test soon.
@hangc0276 I found when syncThread do flush, there exists duplicate checkpointComplete
invoke.
SyncThread flush:
private void flush() {
Checkpoint checkpoint = checkpointSource.newCheckpoint();
try {
ledgerStorage.flush();
} catch (NoWritableLedgerDirException e) {
log.error("No writeable ledger directories", e);
dirsListener.allDisksFull(true);
return;
} catch (IOException e) {
log.error("Exception flushing ledgers", e);
return;
}
if (disableCheckpoint) {
return;
}
log.info("Flush ledger storage at checkpoint {}.", checkpoint);
try {
checkpointSource.checkpointComplete(null, checkpoint, false);
} catch (IOException e) {
log.error("Exception marking checkpoint as complete", e);
dirsListener.allDisksFull(true);
}
}
@Override
public void flush() throws IOException {
Checkpoint cp = checkpointSource.newCheckpoint();
checkpoint(cp);
checkpointSource.checkpointComplete(ledgerDir, cp, true);
}
2.When bookie startup, read the minimal lastMark instead of the maximal lastMark as current last mark. @hangc0276 How is this logic implemented? Do you need to modify the following logic?:
@hangc0276 I found when syncThread do flush, there exists duplicate
checkpointComplete
invoke.SyncThread flush:
private void flush() { Checkpoint checkpoint = checkpointSource.newCheckpoint(); try { ledgerStorage.flush(); } catch (NoWritableLedgerDirException e) { log.error("No writeable ledger directories", e); dirsListener.allDisksFull(true); return; } catch (IOException e) { log.error("Exception flushing ledgers", e); return; } if (disableCheckpoint) { return; } log.info("Flush ledger storage at checkpoint {}.", checkpoint); try { checkpointSource.checkpointComplete(null, checkpoint, false); } catch (IOException e) { log.error("Exception marking checkpoint as complete", e); dirsListener.allDisksFull(true); } }
@Override public void flush() throws IOException { Checkpoint cp = checkpointSource.newCheckpoint(); checkpoint(cp); checkpointSource.checkpointComplete(ledgerDir, cp, true); }
I also think the checkpoint is duplicated here
2.When bookie startup, read the minimal lastMark instead of the maximal lastMark as current last mark. @hangc0276 How is this logic implemented? Do you need to modify the following logic?:
![]()
+1, changes should also be made at Journal.readLog
it's better to add proper test case to avoid future regressions
@nicoloboschi I have added the test to cover this change, please help take a look, thanks.
@merlimat @eolivelli @nicoloboschi @gaozhangmin @lordcheng10 I have updated the code and added the test to cover this change, Please help take a look, thanks.
rerun failure checks
rerun failure checks
rerun failure checks
@hangc0276 I found when syncThread do flush, there exists duplicate
checkpointComplete
invoke. SyncThread flush:private void flush() { Checkpoint checkpoint = checkpointSource.newCheckpoint(); try { ledgerStorage.flush(); } catch (NoWritableLedgerDirException e) { log.error("No writeable ledger directories", e); dirsListener.allDisksFull(true); return; } catch (IOException e) { log.error("Exception flushing ledgers", e); return; } if (disableCheckpoint) { return; } log.info("Flush ledger storage at checkpoint {}.", checkpoint); try { checkpointSource.checkpointComplete(null, checkpoint, false); } catch (IOException e) { log.error("Exception marking checkpoint as complete", e); dirsListener.allDisksFull(true); } }
@Override public void flush() throws IOException { Checkpoint cp = checkpointSource.newCheckpoint(); checkpoint(cp); checkpointSource.checkpointComplete(ledgerDir, cp, true); }
I also think the checkpoint is duplicated here
@hangc0276 This problem is not going to be resolved here, right?
@hangc0276 I found when syncThread do flush, there exists duplicate
checkpointComplete
invoke. SyncThread flush:private void flush() { Checkpoint checkpoint = checkpointSource.newCheckpoint(); try { ledgerStorage.flush(); } catch (NoWritableLedgerDirException e) { log.error("No writeable ledger directories", e); dirsListener.allDisksFull(true); return; } catch (IOException e) { log.error("Exception flushing ledgers", e); return; } if (disableCheckpoint) { return; } log.info("Flush ledger storage at checkpoint {}.", checkpoint); try { checkpointSource.checkpointComplete(null, checkpoint, false); } catch (IOException e) { log.error("Exception marking checkpoint as complete", e); dirsListener.allDisksFull(true); } }
@Override public void flush() throws IOException { Checkpoint cp = checkpointSource.newCheckpoint(); checkpoint(cp); checkpointSource.checkpointComplete(ledgerDir, cp, true); }
I also think the checkpoint is duplicated here
@hangc0276 This problem is not going to be resolved here, right?
@gaozhangmin Yes,we can use another PR to solve it.
rerun failure checks
rerun failure checks
@hangc0276 I found when syncThread do flush, there exists duplicate
checkpointComplete
invoke. SyncThread flush:private void flush() { Checkpoint checkpoint = checkpointSource.newCheckpoint(); try { ledgerStorage.flush(); } catch (NoWritableLedgerDirException e) { log.error("No writeable ledger directories", e); dirsListener.allDisksFull(true); return; } catch (IOException e) { log.error("Exception flushing ledgers", e); return; } if (disableCheckpoint) { return; } log.info("Flush ledger storage at checkpoint {}.", checkpoint); try { checkpointSource.checkpointComplete(null, checkpoint, false); } catch (IOException e) { log.error("Exception marking checkpoint as complete", e); dirsListener.allDisksFull(true); } }
@Override public void flush() throws IOException { Checkpoint cp = checkpointSource.newCheckpoint(); checkpoint(cp); checkpointSource.checkpointComplete(ledgerDir, cp, true); }
I also think the checkpoint is duplicated here
@hangc0276 This problem is not going to be resolved here, right?
@gaozhangmin Yes,we can use another PR to solve it.
I submit pr #3353 to solve this issue. @hangc0276 PTAL
This PR changed the default logMark
behaviour, it lead to some unit test failure, I'm working on it.
fix old workflow,please see #3455 for detail
There are two places that can trigger checkpoint
.
- the scheduled tasks in
SyncThread.doCheckpoint
- the ledgerDir write-cache full and then flush The second way is the root cause of data loss.
If removing the checkpointSource.checkpointComplete
logic in flush
, then at this point we will not delete journal files.
The scheduled task SyncThread.doCheckpoint
will invoke checkpointSource.checkpointComplete
and it's safe here to delete journal files since we have already flushed all write-caches for all ledger directories.
WDYT @hangc0276
There are two places that can trigger
checkpoint
.
- the scheduled tasks in
SyncThread.doCheckpoint
- the ledgerDir write-cache full and then flush The second way is the root cause of data loss.
If removing the
checkpointSource.checkpointComplete
logic inflush
, then at this point we will not delete journal files.The scheduled task
SyncThread.doCheckpoint
will invokecheckpointSource.checkpointComplete
and it's safe here to delete journal files since we have already flushed all write-caches for all ledger directories.WDYT @hangc0276
@aloyszhang Thanks for your suggestion. Yes, making the SyncThread.doCheckpoint
the only endpoint is the simplest way to solve this problem. I updated the PR description, please help take a look, thanks.
@eolivelli @merlimat @dlg99 @zymap I updated the code, and need your eyes for this PR, thanks.