dragonfly icon indicating copy to clipboard operation
dragonfly copied to clipboard

Redesign pipelining support in Dragonfly

Open romange opened this issue 2 months ago • 4 comments

Epic

Currently, the io/loop in dragonfly looks like this:

---
title: DflyConn Request-Response Loop
---
graph TD
    A([socket.FiberRead]);  
    A -- Data Received --> C[cmd.dispatch];
    C -- Response Generated --> D([socket.FiberWrite]);
    D -- Write Complete --> A;

This loop runs in the context of the DflyConn fiber that manages the connection. Unfortunately, it introduces at least 3 preemption points (cmd.dispatch has 1 or more), which in turn causes visible latency per command that aggregates for pipelines. To address this we introduced an additional Dispatch fiber that is responsible for executing commands, while DflyConn reads and parses data from the socket. Both fibers communicate via dispatch_q using Producer/Consumer relationship.

The logic that decides whether use dispatch_q_ is in Connection::DispatchSingle. When DflyConn recognizes a possible pipeline (there is more data in the input buffer after parsing the current command) it switches to the following mode:

---
title: DflyConn Read/Dispatch Loop
---
graph TD
    A([socket.FiberRead]);
    A -- Data Received --> C["dispatch_q.push(cmd)"]    
    C --> A
---
title: DispatchFiber Loop
---
graph TD    
    D([dispatch_q.pop]) -->  E[cmd.dispatch];
    E -- Response Generated --> F([socket.FiberWrite]);
    F -- Write Complete --> D;

so we introduced a fiber to speed-up reading from the socket while pipeline is executing, but DispatchFiber is still slow as cmd.dispatch preempts and issues one or more hops to shard threads.

To alleviate this we introduced Connection::SquashPipeline() and MultiCommandSquasher that can take a bunch of (simple) single shard commands and squash them into a single transaction, so that say 30 commands in the pipeline will be executed in a single hop. This works reasonably well but requires lots of cpu and still has not the most efficient io pattern as it must issue k commands and then wait for all of them to return and then issue a blocking reply to the socket.

To summarize:

  1. We have a complicated two-fiber design to handle pipelines (and our of order replies like pubsub)
  2. Our command.dispatch is synchronous, which warrants workarounds like MultiCommandSquasher
  3. On the socket level we use FiberRead call, that requires a dedicated input buffer in advance. i.e. 1000 connections blocked on read, potentially reading a 64kb blob will require a 64MB of reserved space even if there are 0 in-flight requests.
  4. FiberWrite call is fiber-blocking. Which means that if the socket is blocked on write - the whole flow is blocked. Moreover, we can not control the amount of pending writes, we can only close the connection externally to unblock the blocked fiber.
  5. The pipelining efficiency is sub-optimal. Can be easily reproduced by load-testing a single connection dfly_bench --ratio 0:1 --pipeline=30 -c 1 --proactor_threads=1 -n 2000000 on dragonfly vs valkey, for example.

Suggestions

Each item is a task that requires careful execution and planning.

  1. Stop using FiberRead and switch to OnRecv interface (https://github.com/romange/helio/pull/480). As OnRecv separates read notification from reading data, we can reduce reliance on per connection io_buf. OnRecv can be called at any time when a fiber is preempted. Meaning that it will be able to read data during its other preemption points like cmd.dispatch.
  2. Following (1) we can merge DispatchFiber and DflyConn together as now we do not need a dedicated fiber for reading from socket and adding a parsed command into dispatch_q.
  3. Prepare for asynchronous cmd.dispatch executions that separate command execution from sending replies. We should remove assumptions in facade that cmd.dispatch is synchronous: introduce ParsedCommand on-heap entity that can be passed to cmd.dispatch. Similarly command arguments should always be allocated on head like we do today with dispatch_q. Basically we should make sure that if cmd.dispatch returns before command finishes executing, facade will work as expected.
  4. Start using posix socket.send instead of socket.FiberWrite - the iouring model of socket.send is suboptimal, as the underlying send still needs to copy the passed iovec to networking buffers, therefore, calling socket.send that synchronously pushes data if possible gives us the most control: a) we can identify pending writes, b) we do not need to keep buffers if send succeeds and sends the iovec, c) it removes preemption point (though I must say the latency of that is almost negligible, at list for small responses.)
  5. Devise a plan on how we can gradually rollout async implementation of commands. The goal is to deprecate MultiCommandSquasher for pipelines (and later for multi/exec transactions). I believe that changing 30-50 most common commands will be enough.

The POC that demonstrates the feasibility of this approach is located here: https://github.com/romange/midi-redis/tree/Pr2

romange avatar Nov 04 '25 09:11 romange

For (1) and (2):

So the idea is to demultiplex blocking for IO in the dispatch fb ? By registering the hooks, we get async completions for read events so the dispatch fiber never actually blocks on socket io. Is that right ?

For (3)

Interesting. When you mentioned this on one of the dailies and I discussed about concurrently dispatching squashed pipelines. I now understand that by turning everything async you get this nice property because now we never block waiting for the squashed hop to finish. That way, we can dispatch another squashed pipeline and another one while the others are in-flight

For (4) and specifically:

as the underlying send still needs to copy the passed iovec to networking buffers

You can do zero copy for both disk and network io. The later is just a relative new feature which works similarly to disk io -- you preregister a buffer to the kernel (which the kernel pins) and then do the IO . See: https://lwn.net/Articles/879724/ and IORING_OP_SENDZC (kernel v 6.0) over MSG_ZEROCOPY

kostasrim avatar Nov 04 '25 10:11 kostasrim

Dispatch fiber is not the one that blocks on io, DflyConn blocks. Yes, to change DflyConn so it won't block on socket.FiberRead. The focus is on recv io, mostly. send io is solved differrently as the control there is reversed - we can just use a non-blocking send and decide what to do in-case it fails with EAGAIN, in contrast to the current state we do not have an option to decide and socket.FiberWrite just blocks.

romange avatar Nov 04 '25 13:11 romange

Some benchmark tests for dfly_bench --ratio 0:1 --pipeline=30 -c 1 --proactor_threads=1 -n 2000000 . servers run on m7g.2xlarge. dfly_bench run from c8gn.4xlarge

valkey:

  1. 333K qps
  2. Dragonfly with squashing enabled: 200K qps
  3. Dragonfly with squashing disabled: 100K qps
  4. midi-redis main branch (no pipeline support): 100K qps
  5. midi-redis Pr2 branch: asynchronous sock.receive, socket.send and Service::Get operations, with a single fiber handling everything: 333K qps

For multi-connection use-case (remove --proactor_threads=1, increase-c 2 ) where we fully overload the server under test, midi-redis.Pr2 is a clear winner with 3.1M+ qps, valkey is at 1.6M qps.

romange avatar Nov 04 '25 13:11 romange

Another evidence of sub-optimal pipeline performance: https://github.com/dragonflydb/dragonfly/issues/6192#issuecomment-3640646860

romange avatar Dec 11 '25 09:12 romange