dragonfly icon indicating copy to clipboard operation
dragonfly copied to clipboard

Support atomicity in multi key command with replication

Open adiholden opened this issue 2 years ago • 18 comments

In stable state replication multi key command which took place in master should be executed on replica only if all the transaction finished in master

adiholden avatar Dec 20 '22 10:12 adiholden

As we understand that the current implementation of multi key command in replica can create deadlock incase master executed the commands out of order, I would like to suggest a new solution. This will not ensure atomicity in replication (as it is not must have for this milestone) . The new solution will execute the commands only if all the shard received the transaction, and it also will ensure the correctness of global commands i.e flushdb

Suggested solution: all the changes bellow are in replica

  1. We will hold a shared map between all shards, key: txid, value: BlockingCounter initialized to the number of unique shards, and shard execution counter.
  2. Queue per shard which will hold for each transaction its meta data and the shard commands (data from journal) , this queue will be member of replica class that is created for each flow.
  3. We will have 2 fibers for each shard data in replica. One for reading from socket and one for execution. The reader fiber will read the data using the journal reader, and will insert the data to the shard Queue, it will update the shard map, if transaction is not in map it will add to map, it will decrease the blocking counter. The exection fiber will always access the first item in shard queue, if the transaction is on one shard than execute it, if the it is on multi shards wait on the blocking counter of the transaction in the shared map, once wait return execute the shard command. Update in the shard map the counter of execution, the last fiber to execute will remove from map the entry. In this solution each shard execution fiber executes only its shard command, dispatched one my one no matter if they were created from multi command transaction in master or from multi shard command.

Special flow for global command: Because the global must be executed in all shards at the same time, we will use a barrier for it and only one execution fiber will execute it. The reading fiber will insert to map a the txid and Barrier (instead of BlockingCounter), the exection fiber will wait on the barrier and it will be executed by the fiber in the thread that inserted the transaction to map, to note this we will use a flag in the shard queue transaction data.

@romange @dranikpg please review

adiholden avatar Jan 04 '23 08:01 adiholden

Thanks! I have comments about (3).

If you have a shared map [txid] with some metadata, why do you prefer dispatching (executing) pieces of the transaction from all the involved flows? Why not put them into the shared place and the last flow will execute them?

Also, I think that this approach may cause latency spikes for mult-shard commands like mset even if they run on distinct keys. I think it is possible to improve this approach if we track locked keys in each flow, but since such improvement builds upon your suggested solution I think we can wait with that until the basic solution is ready.

romange avatar Jan 04 '23 08:01 romange

Why not put them into the shared place and the last flow will execute them?

The issue I'm thinking about is how to provide backpressure on on other flows, because the last one to receive will execute and will lag behind only more

dranikpg avatar Jan 04 '23 09:01 dranikpg

Special flow for global command:

Basically, how it works now. But we can't get the same problem because it always locks the full db and out of order execution is not possible here, right?

dranikpg avatar Jan 04 '23 09:01 dranikpg

@romange lets take and example shard 1 journal: txid 1 - set x 1 txid 2 - set y 1

shard 2 journal: txid 2 - set a 1 txid 1 - set b 1 txid 3 - set b 2

In this journal example lets think of the flow in which thread shard 2 is runnning insert txid 2 to map, no exec insert txid 1 to map, no exec txid 3 is on single shard, exec

now thread of shard 1 is running update map with txid 1, exec

so we end up with b = 1. This is wrong. There for from what I understand we must execute the commands in order in each shard journal and therefor I suggested to have fiber per shard executing the top of the queue

adiholden avatar Jan 04 '23 09:01 adiholden

@dranikpg true this is the same flow as now and for global commands there is no ooo execution

adiholden avatar Jan 04 '23 09:01 adiholden

You are right. The enhancement I thought of would fix this issue so maybe it's worth writing it here: for every transaction slice we would keep locks set in the owning flow. So for the example above, shard 2 would insert:

lock_set.insert(a);
lock_set.insert(b);
...

and your execution fiber is allowed to progress as long as the next transaction does not overlap with already locked keys. So you do not need to block on a, i.e. you can proceed with b/tx_1 but would need to block on b/tx_3. As I said - not the first priority. Lets implement the basic flow we will come back to this later.

romange avatar Jan 04 '23 16:01 romange

Another comment for (3): it awfully resembles what master does when it schedules a multi-shard tx.

  1. the coordinator fiber reads the command and dispatches it to run
  2. during the dispatch it schedules it on the execution fiber inside the tx queue. The tx can not execute yet until it's "armed" in the shard. See IsArmedInShard function.
  3. Once the transaction is scheduled, it's been "activated" to run inside Transaction::ExecuteAsync.

Why it is better? if we adapt the already existing flow to this use-case, we won't need to duplicate the logic. In addition, we reduce part of the latency by scheduling the transaction slice in replica before we can execute it. we can not run it yet, but at least it's been added to the tx queue.

The complexity rises when a connection disconnects and you need to clean up transactions that are half scheduled otherwise the tx queue will be deadlocked.

I think that this issue should not be implemented first - but something we should work on later, when all other replication features are in place. meanwhile, it should be designed in the doc with a level of detail higher than here

romange avatar Jan 05 '23 04:01 romange

@romange The changes are in main branch with flag to disable/enable the sync between shards as you suggested.

How should we compare the performance? I can use the memtear benchmark for multi shard commands, but what are we looking for in replica? is there specific metric that are already implemented that I can check to compare with/without flag?

adiholden avatar Jan 09 '23 15:01 adiholden

You can run memtier on a master instance. Because it has back pressure it will slow down if the replica does not catch up. The issue is that the bottleneck is data transfer, so replica execution is likely to have no effect at all.

What can be checked instead is the cpu load of the replica or its GET throughput when receiving.

Besides, stable sync on main is currently very slow, you shouldn't benchmark on it. Asnyc steamer is ready to be merged (waiting for tests), it gives about 40-70% of initial speed

dranikpg avatar Jan 09 '23 15:01 dranikpg

Vlad, why do you think that the bottleneck is data transfer? I am not sure it's true.

On Mon, Jan 9, 2023 at 5:46 PM Vladislav @.***> wrote:

You can run memtier on a master instance. Because it has back pressure it will slow down if the replica does not catch up. The issue is that the bottleneck is data transfer, so replica execution is likely to have no effect at all.

What can be checked instead is the cpu load of the replica or its GET throughput when receiving.

Besides, stable sync on main is currently very slow, you shouldn't benchmark on it. Asnyc steamer is ready to be merged (waiting for tests), it gives about 40-70% of initial speed

— Reply to this email directly, view it on GitHub https://github.com/dragonflydb/dragonfly/issues/583#issuecomment-1375834690, or unsubscribe https://github.com/notifications/unsubscribe-auth/AA4BFCDMI2YQGTJWANFNJ4TWRQXD3ANCNFSM6AAAAAATEKAMFM . You are receiving this because you were mentioned.Message ID: @.***>

romange avatar Jan 09 '23 16:01 romange

Because when I ran the benchmarks:

  • Skipping execution at all did not increase throughput
  • Improving async streamer wakeups increased throughput a lot (so there is opportunity for improvement on master)
  • Master was on 100% CPU, replica was about 30-50% or even less

But I didn't think about the most obvious solution - we can run the replica just on fewer shards, so it has more work to do

dranikpg avatar Jan 09 '23 16:01 dranikpg

The master was on 100% CPU - that means the CPU is the bottleneck

On Mon, Jan 9, 2023 at 6:35 PM Vladislav @.***> wrote:

Because when I ran the benchmarks:

  • Skipping execution at all did not increase throughput
  • Improving async streamer wakeups increased throughput a lot (so there is opportunity for improvement on master)
  • Master was on 100% CPU, replica was about 30-50% or even less

But I didn't think about the most obvious solution - we can run the replica just on fewer shards, so it has more work to do

— Reply to this email directly, view it on GitHub https://github.com/dragonflydb/dragonfly/issues/583#issuecomment-1375918974, or unsubscribe https://github.com/notifications/unsubscribe-auth/AA4BFCHUQYKTKE3KVJ3WTCLWRQ43XANCNFSM6AAAAAATEKAMFM . You are receiving this because you were mentioned.Message ID: @.***>

romange avatar Jan 09 '23 16:01 romange

Yes, I didn't mean the bandwidth directly, just the whole mechanism that is responsible for sending replication data

dranikpg avatar Jan 09 '23 16:01 dranikpg

Hi, @romange @dranikpg I run the following command on my laptop while master and replica using 4 cores each one: memtier_benchmark -c 1 --command "mset key foo key bar " -d 128 -n 100000 --distinct-client-seed --key-maximum=100000

Here are the results: flag enable_multi_shard_sync=1 finished after 17 seconds replica cpu ~50%

flag enable_multi_shard_sync=0 finished after 3 seconds replica cpu ~45%

adiholden avatar Jan 10 '23 11:01 adiholden

Is this the time it takes the benchmark to run on master? Its strange there is such a large difference. It can be caused by the replica buffer filling up.

Why do you run it with c=1? Its much much slower this way.

Did you try on the async streamer branch? If you divide time/n, then you'll get a QPS of around 30k at best, which is very low. We don't have acceptable QPS on main branch so far

dranikpg avatar Jan 10 '23 11:01 dranikpg

do you mean __key__ inside the command @adiholden ?

romange avatar Jan 10 '23 12:01 romange

@romange yes

adiholden avatar Jan 10 '23 12:01 adiholden