dragonfly
dragonfly copied to clipboard
Support atomicity in multi key command with replication
In stable state replication multi key command which took place in master should be executed on replica only if all the transaction finished in master
As we understand that the current implementation of multi key command in replica can create deadlock incase master executed the commands out of order, I would like to suggest a new solution. This will not ensure atomicity in replication (as it is not must have for this milestone) . The new solution will execute the commands only if all the shard received the transaction, and it also will ensure the correctness of global commands i.e flushdb
Suggested solution: all the changes bellow are in replica
- We will hold a shared map between all shards, key: txid, value: BlockingCounter initialized to the number of unique shards, and shard execution counter.
- Queue per shard which will hold for each transaction its meta data and the shard commands (data from journal) , this queue will be member of replica class that is created for each flow.
- We will have 2 fibers for each shard data in replica. One for reading from socket and one for execution. The reader fiber will read the data using the journal reader, and will insert the data to the shard Queue, it will update the shard map, if transaction is not in map it will add to map, it will decrease the blocking counter. The exection fiber will always access the first item in shard queue, if the transaction is on one shard than execute it, if the it is on multi shards wait on the blocking counter of the transaction in the shared map, once wait return execute the shard command. Update in the shard map the counter of execution, the last fiber to execute will remove from map the entry. In this solution each shard execution fiber executes only its shard command, dispatched one my one no matter if they were created from multi command transaction in master or from multi shard command.
Special flow for global command: Because the global must be executed in all shards at the same time, we will use a barrier for it and only one execution fiber will execute it. The reading fiber will insert to map a the txid and Barrier (instead of BlockingCounter), the exection fiber will wait on the barrier and it will be executed by the fiber in the thread that inserted the transaction to map, to note this we will use a flag in the shard queue transaction data.
@romange @dranikpg please review
Thanks! I have comments about (3).
If you have a shared map [txid] with some metadata, why do you prefer dispatching (executing) pieces of the transaction from all the involved flows? Why not put them into the shared place and the last flow will execute them?
Also, I think that this approach may cause latency spikes for mult-shard commands like mset even if they run on distinct keys. I think it is possible to improve this approach if we track locked keys in each flow, but since such improvement builds upon your suggested solution I think we can wait with that until the basic solution is ready.
Why not put them into the shared place and the last flow will execute them?
The issue I'm thinking about is how to provide backpressure on on other flows, because the last one to receive will execute and will lag behind only more
Special flow for global command:
Basically, how it works now. But we can't get the same problem because it always locks the full db and out of order execution is not possible here, right?
@romange lets take and example shard 1 journal: txid 1 - set x 1 txid 2 - set y 1
shard 2 journal: txid 2 - set a 1 txid 1 - set b 1 txid 3 - set b 2
In this journal example lets think of the flow in which thread shard 2 is runnning insert txid 2 to map, no exec insert txid 1 to map, no exec txid 3 is on single shard, exec
now thread of shard 1 is running update map with txid 1, exec
so we end up with b = 1. This is wrong. There for from what I understand we must execute the commands in order in each shard journal and therefor I suggested to have fiber per shard executing the top of the queue
@dranikpg true this is the same flow as now and for global commands there is no ooo execution
You are right. The enhancement I thought of would fix this issue so maybe it's worth writing it here: for every transaction slice we would keep locks set in the owning flow. So for the example above, shard 2 would insert:
lock_set.insert(a);
lock_set.insert(b);
...
and your execution fiber is allowed to progress as long as the next transaction does not overlap with already locked keys.
So you do not need to block on a
, i.e. you can proceed with b/tx_1
but would need to block on b/tx_3
. As I said - not the first priority. Lets implement the basic flow we will come back to this later.
Another comment for (3): it awfully resembles what master does when it schedules a multi-shard tx.
- the coordinator fiber reads the command and dispatches it to run
- during the dispatch it schedules it on the execution fiber inside the tx queue. The tx can not execute yet until it's "armed" in the shard. See
IsArmedInShard
function. - Once the transaction is scheduled, it's been "activated" to run inside
Transaction::ExecuteAsync
.
Why it is better? if we adapt the already existing flow to this use-case, we won't need to duplicate the logic. In addition, we reduce part of the latency by scheduling the transaction slice in replica before we can execute it. we can not run it yet, but at least it's been added to the tx queue.
The complexity rises when a connection disconnects and you need to clean up transactions that are half scheduled otherwise the tx queue will be deadlocked.
I think that this issue should not be implemented first - but something we should work on later, when all other replication features are in place. meanwhile, it should be designed in the doc with a level of detail higher than here
@romange The changes are in main branch with flag to disable/enable the sync between shards as you suggested.
How should we compare the performance? I can use the memtear benchmark for multi shard commands, but what are we looking for in replica? is there specific metric that are already implemented that I can check to compare with/without flag?
You can run memtier on a master instance. Because it has back pressure it will slow down if the replica does not catch up. The issue is that the bottleneck is data transfer, so replica execution is likely to have no effect at all.
What can be checked instead is the cpu load of the replica or its GET throughput when receiving.
Besides, stable sync on main is currently very slow, you shouldn't benchmark on it. Asnyc steamer is ready to be merged (waiting for tests), it gives about 40-70% of initial speed
Vlad, why do you think that the bottleneck is data transfer? I am not sure it's true.
On Mon, Jan 9, 2023 at 5:46 PM Vladislav @.***> wrote:
You can run memtier on a master instance. Because it has back pressure it will slow down if the replica does not catch up. The issue is that the bottleneck is data transfer, so replica execution is likely to have no effect at all.
What can be checked instead is the cpu load of the replica or its GET throughput when receiving.
Besides, stable sync on main is currently very slow, you shouldn't benchmark on it. Asnyc steamer is ready to be merged (waiting for tests), it gives about 40-70% of initial speed
— Reply to this email directly, view it on GitHub https://github.com/dragonflydb/dragonfly/issues/583#issuecomment-1375834690, or unsubscribe https://github.com/notifications/unsubscribe-auth/AA4BFCDMI2YQGTJWANFNJ4TWRQXD3ANCNFSM6AAAAAATEKAMFM . You are receiving this because you were mentioned.Message ID: @.***>
Because when I ran the benchmarks:
- Skipping execution at all did not increase throughput
- Improving async streamer wakeups increased throughput a lot (so there is opportunity for improvement on master)
- Master was on 100% CPU, replica was about 30-50% or even less
But I didn't think about the most obvious solution - we can run the replica just on fewer shards, so it has more work to do
The master was on 100% CPU - that means the CPU is the bottleneck
On Mon, Jan 9, 2023 at 6:35 PM Vladislav @.***> wrote:
Because when I ran the benchmarks:
- Skipping execution at all did not increase throughput
- Improving async streamer wakeups increased throughput a lot (so there is opportunity for improvement on master)
- Master was on 100% CPU, replica was about 30-50% or even less
But I didn't think about the most obvious solution - we can run the replica just on fewer shards, so it has more work to do
— Reply to this email directly, view it on GitHub https://github.com/dragonflydb/dragonfly/issues/583#issuecomment-1375918974, or unsubscribe https://github.com/notifications/unsubscribe-auth/AA4BFCHUQYKTKE3KVJ3WTCLWRQ43XANCNFSM6AAAAAATEKAMFM . You are receiving this because you were mentioned.Message ID: @.***>
Yes, I didn't mean the bandwidth directly, just the whole mechanism that is responsible for sending replication data
Hi, @romange @dranikpg I run the following command on my laptop while master and replica using 4 cores each one: memtier_benchmark -c 1 --command "mset key foo key bar " -d 128 -n 100000 --distinct-client-seed --key-maximum=100000
Here are the results: flag enable_multi_shard_sync=1 finished after 17 seconds replica cpu ~50%
flag enable_multi_shard_sync=0 finished after 3 seconds replica cpu ~45%
Is this the time it takes the benchmark to run on master? Its strange there is such a large difference. It can be caused by the replica buffer filling up.
Why do you run it with c=1? Its much much slower this way.
Did you try on the async streamer branch? If you divide time/n, then you'll get a QPS of around 30k at best, which is very low. We don't have acceptable QPS on main branch so far
do you mean __key__
inside the command @adiholden ?
@romange yes