[RFC] Avoid data loss in vanilla segment replication
Background
In the production environment, we discovered the following issues with the primary promotion in segment replication and attempted to optimize them. This RFC mainly describes our solution and also aims to hear suggestions from the community.
- During primary promotion of vanilla segment replication, data loss may occur.
- Primary promotion may take up to
15minutes or even longer. During this period, write throughput significantly decreases.
The purpose of this RFC is to discuss solutions to data loss in vanilla segment replication. Another issue will be discussed in https://github.com/opensearch-project/OpenSearch/issues/20131.
Reproduction
Case of data loss
I introduced SegmentReplicationIT#testPrimaryStopped_ReplicaPromoted_reproduction_data_loss, which can reproduce data loss. I submitted the code to branch.
The execution process is described as follows.
- Start two nodes.
- Create an index with
1primary shard and1replica shard, enable segment replication, and disable automatic refresh. - Write
doc1. ThenextSeqNoof the primary shard is updated to1, and theprocessedCheckpointandmaxSeqNoare updated to0. ThenextSeqNoof the replica shard is updated to1. - Write
doc2. ThenextSeqNoof the primary shard is updated to2. - Before the primary shard executes
InternalEngine#indexIntoLuceneondoc2, add a lock to block. - Perform a flush operation. The primary shard build segment, then persists the index files to disk, with the
local_checkpointin userData being0and themax_seq_nobeing1. - Wait for the segment replication to finish. Both the primary and the replica contain
doc1. The replica shard updated theprocessedCheckpointto1. - Release the lock in Step
5to allow the write operation ondoc2to complete. Both the primary shard and the replica shard contain the translog ofdoc2. - Shut down the node where the primary shard is located.
- The replica is promoted to the primary shard. First, execute
NRTReplicationEngineclose to persist the index files to disk, with thelocal_checkpointin userData being1and themax_seq_nobeing1. Then, switch toInternalEngine, start translog recovery fromprocessedCheckpoint + 1, and skip the translog corresponding todoc2. - After the replica is promoted to the primary shard,
doc2is lost.
Analysis
The cause of data loss
When primary promotion, the replica first closes the engine, records the LocalCheckpointTracker#processedCheckpoint in userData, and persists the index files. Then it switches to InternalEngine and starts recovering the translog from LocalCheckpointTracker#processedCheckpoint + 1.
In the scenario of vanilla segment replication, during the finalize phase of segment replication, the replica will advance the LocalCheckpointTracker#processedCheckpoint to infos.userData.get(MAX_SEQ_NO). The infos.userData.get(MAX_SEQ_NO) is recorded by primary shard during the flush operation.
This also means that the doc between the LocalCheckpointTracker#processedCheckpoint and the LocalCheckpointTracker#nextSeqNo may be lost after the replica is promoted.
Solution
Avoid data loss
During segment replication, when InternalEngine#getSegmentInfosSnapshot is invoked, record InternalEngine.LastRefreshedCheckpointListener#refreshedCheckpoint in segmentInfos.userData and use it to update replica shard's LocalCheckpointTracker#processedCheckpoint.
Evaluation
No data loss
In the branch, using the new logic in NRTReplicationEngine#updateSegments allows the test SegmentReplicationIT#testPrimaryStopped_ReplicaPromoted_reproduction_data_loss to pass.
Related component
No response
Describe alternatives you've considered
No response
Additional context
No response
@Bukhtawar @sachinpkale @gbbafna @mch2 @ashking94 @shourya035 @linuxpi - Do take a look and give your feedback:)
Thanks @guojialiang92 for reporting the issue(data loss for possibly the first doc post last refresh?), I am assuming this should be fairly reproducible with the remote store option as well given we maintain similar logic for checkpoint tracking. Also should we open a second issue for speeding up tlog recovery with concurrent threads?
Thank you for your reply. @Bukhtawar
data loss for possibly the first doc post last refresh?
Yes.
I am assuming this should be fairly reproducible with the remote store option as well given we maintain similar logic for checkpoint tracking.
Remote store does not have the problem of data loss because it rewrites the maxSeqNo in userData when uploading segments.
The solution of Remote Store is similar to the one I proposed, but there are also slight differences. Since Remote Store first executes RemoteStoreRefreshListener and then LastRefreshedCheckpointListener, it is necessary to use LastRefreshedCheckpointListener#pendingCheckpoint. In vanilla segment replication, the checkpoint is published after refresh, so LastRefreshedCheckpointListener#refreshedCheckpoint needs to be used.
Also should we open a second issue for speeding up tlog recovery with concurrent threads?
OK, I created an RFC to discuss the acceleration of translog recovery. This RFC is mainly used to discuss the issue of vanilla segment replication data loss.
Thanks @guojialiang92 for identifying the gap.
Just wanted to clarify 1 thing though so that we are on the same page:
Write doc1. The LocalCheckpointTracker#nextSeqNo of the primary shard is updated to 0, and the LocalCheckpointTracker#processedCheckpoint is updated to 0. The LocalCheckpointTracker#nextSeqNo of the replica shard is updated to 0
nextSeqNo depicts the next available sequence number so when we write doc1 it would be assigned seqno 0 and nextSeqNo will be updated to 1. The processedCheckpoint and maxSeqNo would remain 0. Similarly after doc2 comes, it would be assigned seqno 1 and the nextSeqNo will increment to 2.
But the issue you outlined is still valid as replica is starting from an incorrect local_checkpoint after failover. Could we put the last refreshed checkpoint in max_seq_no in userData in segment replication setup? instead of adding a new last_refreshed_checkpointkey inuserData` . This would keep the userData structure consistent across segment replication and remote store.
Thanks @linuxpi
nextSeqNo depicts the next available sequence number so when we write doc1 it would be assigned seqno 0 and nextSeqNo will be updated to 1. The processedCheckpoint and maxSeqNo would remain 0. Similarly after doc2 comes, it would be assigned seqno 1 and the nextSeqNo will increment to 2.
Your understanding is correct :)
Using maxSeqNo is more rigorous, and I will also correct the description in the issue.
Could we put the last refreshed checkpoint in max_seq_no in userData in segment replication setup? instead of adding a new last_refreshed_checkpointkey inuserData` . This would keep the userData structure consistent across segment replication and remote store.
I think it's OK. The code in the branch is just for quickly reproducing the issue.
Meanwhile, I would like to submit a PR to fix this issue. Could you please assign to me?
Looking forward to the PR @guojialiang92 :)
Hi, @linuxpi.
Looking forward to the PR @guojialiang92 :)
I have already submitted the PR and ensured that the tests pass. I would like to invite you to help review the code. Thank you. :)