OpenSearch
OpenSearch copied to clipboard
[Segment Replication] Optimise network bandwidth on primaries
Is your feature request related to a problem? Please describe. As a part of the segment replication design proposal, there was a concern around increased network utilisation that
This will improve indexing throughput and lower resource utilization on replicas at the expense of increased network usage. This is due to the fact that primary would generate segments on refresh/merges and would copy over generated segments to all replica copies, thereby using a significant network bandwidth.
Describe the solution you'd like To fully utilize each machine’s network bandwidth, the control flow and the data flow can be decoupled as in the segments can be pushed linearly along a chain of nodes hosting primary and replica shards first.
Primary -> Replica-1 -> Replica-2 -> Replica-N
The transfer latency can be minimized by pipelining the data transfer i.e once a copy receives some data, it starts forwarding immediately. Thus, each machine’s full outbound bandwidth can be used to transfer the data as fast as possible. Upon data transfer completion we can do the control flow(publish checkpoint metadata), which can go like primary publishing checkpoint concurrently
Primary -> Replica 1 || Replica 2 || Replica 3
This can be done without changing the replication strategy, by maintaining the chaining order on the leader along side in-sync allocation ids
There are similar strategies used at GFS to avoid network bottleneck on a single node https://static.googleusercontent.com/media/research.google.com/en//archive/gfs-sosp2003.pdf
Describe alternatives you've considered A clear and concise description of any alternative solutions or features you've considered.
Additional context Add any other context or screenshots about the feature request here.
This is a good idea and can improve network performance depending on the cluster topology.
Think of this example: N1: P1, R2,R3 N2: P2, R1,R3 N3: P3, R1, R2
With this topology, primary to all would be same as replicas acting as repeaters. However, this approach will greatly improve network performance if the cluster topology is not optimum.
The only trade-off I see is that changes might take longer to be available on all replicas. We might have to implement this in a way that replica doesn't wait for the entire diff and forwards the individual chunk to next replica as soon as it receives.
Also, we might need to depend on a push based model to achieve this.
The only trade-off I see is that changes might take longer to be available on all replicas. We might have to implement this in a way that replica doesn't wait for the entire diff and forwards the individual chunk to next replica as soon as it receives.
Thats already captured in the description above "The transfer latency can be minimized by pipelining the data transfer i.e once a copy receives some data, it starts forwarding immediately"
Also, we might need to depend on a push based model to achieve this.
The uber idea here is the transfer mechanism can be agnostic as long as we separate data and control flow. This can be achieved with a pull based transfer by simply inverting the chain which will depend largely on how quickly can one copy pull data from its peer in the chain
Primary <- Replica-1 <- Replica-2 <- Replica-N
Brainstorming an implementation of this within our current segrep architecture. This is just high level, would need to POC this to see how we could refactor & make this fit.
- The control flow could be tracked in ReplicationTracker and updated via cluster state events, then each shard in a group knows the next link in the chain and can react to node/shard drops.
- Update existing checkpoint publisher from primary to only publish the update to its immediate peer. Alternatively, add a polling mechanism to fetch the latest cp from the sender and ditch the refresh listener entirely.
- An update to SegmentReplicationTargetService/Target would need to be made to initiate sending the cp metadata & segments to the next shard in the chain once received. Alternatively, a polling mechanism to fetch the latest cp similar to step 2.
- PrimaryShardReplicationSource can be renamed as it has no hard restriction on the source operating as the Primary.