Redesign pipelining support in Dragonfly
Epic
Currently, the io/loop in dragonfly looks like this:
---
title: DflyConn Request-Response Loop
---
graph TD
A([socket.FiberRead]);
A -- Data Received --> C[cmd.dispatch];
C -- Response Generated --> D([socket.FiberWrite]);
D -- Write Complete --> A;
This loop runs in the context of the DflyConn fiber that manages the connection.
Unfortunately, it introduces at least 3 preemption points (cmd.dispatch has 1 or more), which in turn causes visible latency per command that aggregates for pipelines.
To address this we introduced an additional Dispatch fiber that is responsible for executing commands, while DflyConn reads and parses data from the socket. Both fibers communicate via dispatch_q using Producer/Consumer relationship.
The logic that decides whether use dispatch_q_ is in Connection::DispatchSingle. When DflyConn recognizes a possible pipeline (there is more data in the input buffer after parsing the current command) it switches to the following mode:
---
title: DflyConn Read/Dispatch Loop
---
graph TD
A([socket.FiberRead]);
A -- Data Received --> C["dispatch_q.push(cmd)"]
C --> A
---
title: DispatchFiber Loop
---
graph TD
D([dispatch_q.pop]) --> E[cmd.dispatch];
E -- Response Generated --> F([socket.FiberWrite]);
F -- Write Complete --> D;
so we introduced a fiber to speed-up reading from the socket while pipeline is executing, but DispatchFiber is still slow as cmd.dispatch preempts and issues one or more hops to shard threads.
To alleviate this we introduced Connection::SquashPipeline() and MultiCommandSquasher that can take a bunch of (simple) single shard commands and squash them into a single transaction, so that say 30 commands in the pipeline will be executed in a single hop. This works reasonably well but requires lots of cpu and still has not the most efficient io pattern as it must issue k commands and then wait for all of them to return and then issue a blocking reply to the socket.
To summarize:
- We have a complicated two-fiber design to handle pipelines (and our of order replies like pubsub)
- Our
command.dispatchis synchronous, which warrants workarounds likeMultiCommandSquasher - On the socket level we use
FiberReadcall, that requires a dedicated input buffer in advance. i.e. 1000 connections blocked on read, potentially reading a 64kb blob will require a 64MB of reserved space even if there are 0 in-flight requests. -
FiberWritecall is fiber-blocking. Which means that if the socket is blocked on write - the whole flow is blocked. Moreover, we can not control the amount of pending writes, we can only close the connection externally to unblock the blocked fiber. - The pipelining efficiency is sub-optimal. Can be easily reproduced by load-testing a single connection
dfly_bench --ratio 0:1 --pipeline=30 -c 1 --proactor_threads=1 -n 2000000on dragonfly vs valkey, for example.
Suggestions
Each item is a task that requires careful execution and planning.
- Stop using
FiberReadand switch to OnRecv interface (https://github.com/romange/helio/pull/480). AsOnRecvseparates read notification from reading data, we can reduce reliance on per connection io_buf.OnRecvcan be called at any time when a fiber is preempted. Meaning that it will be able to read data during its other preemption points likecmd.dispatch. - Following (1) we can merge
DispatchFiberandDflyConntogether as now we do not need a dedicated fiber for reading from socket and adding a parsed command into dispatch_q. - Prepare for asynchronous cmd.dispatch executions that separate command execution from sending replies. We should remove assumptions in facade that cmd.dispatch is synchronous: introduce
ParsedCommandon-heap entity that can be passed tocmd.dispatch. Similarly command arguments should always be allocated on head like we do today with dispatch_q. Basically we should make sure that ifcmd.dispatchreturns before command finishes executing, facade will work as expected. - Start using posix
socket.sendinstead of socket.FiberWrite - the iouring model of socket.send is suboptimal, as the underlying send still needs to copy the passed iovec to networking buffers, therefore, callingsocket.sendthat synchronously pushes data if possible gives us the most control: a) we can identify pending writes, b) we do not need to keep buffers if send succeeds and sends the iovec, c) it removes preemption point (though I must say the latency of that is almost negligible, at list for small responses.) - Devise a plan on how we can gradually rollout async implementation of commands. The goal is to deprecate MultiCommandSquasher for pipelines (and later for multi/exec transactions). I believe that changing 30-50 most common commands will be enough.
The POC that demonstrates the feasibility of this approach is located here: https://github.com/romange/midi-redis/tree/Pr2
For (1) and (2):
So the idea is to demultiplex blocking for IO in the dispatch fb ? By registering the hooks, we get async completions for read events so the dispatch fiber never actually blocks on socket io. Is that right ?
For (3)
Interesting. When you mentioned this on one of the dailies and I discussed about concurrently dispatching squashed pipelines. I now understand that by turning everything async you get this nice property because now we never block waiting for the squashed hop to finish. That way, we can dispatch another squashed pipeline and another one while the others are in-flight
For (4) and specifically:
as the underlying send still needs to copy the passed iovec to networking buffers
You can do zero copy for both disk and network io. The later is just a relative new feature which works similarly to disk io -- you preregister a buffer to the kernel (which the kernel pins) and then do the IO . See: https://lwn.net/Articles/879724/ and IORING_OP_SENDZC (kernel v 6.0) over MSG_ZEROCOPY
Dispatch fiber is not the one that blocks on io, DflyConn blocks. Yes, to change DflyConn so it won't block on socket.FiberRead. The focus is on recv io, mostly. send io is solved differrently as the control there is reversed - we can just use a non-blocking send and decide what to do in-case it fails with EAGAIN, in contrast to the current state we do not have an option to decide and socket.FiberWrite just blocks.
Some benchmark tests for dfly_bench --ratio 0:1 --pipeline=30 -c 1 --proactor_threads=1 -n 2000000 . servers run on m7g.2xlarge. dfly_bench run from c8gn.4xlarge
valkey:
- 333K qps
- Dragonfly with squashing enabled: 200K qps
- Dragonfly with squashing disabled: 100K qps
- midi-redis main branch (no pipeline support): 100K qps
- midi-redis Pr2 branch: asynchronous sock.receive, socket.send and Service::Get operations, with a single fiber handling everything: 333K qps
For multi-connection use-case (remove --proactor_threads=1, increase-c 2 ) where we fully overload the server under test, midi-redis.Pr2 is a clear winner with 3.1M+ qps, valkey is at 1.6M qps.
Another evidence of sub-optimal pipeline performance: https://github.com/dragonflydb/dragonfly/issues/6192#issuecomment-3640646860