redis icon indicating copy to clipboard operation
redis copied to clipboard

Rework async_receive to consume all elements in the channel

Open mzimbres opened this issue 2 months ago • 12 comments

As I work on the generic_flat_response implementation I noticed that async_receive and its sync counterpart receive are too cumbersome and inefficient to work with. At the moment the general for of the receive loop looks like this

for (error_code ec;;) {
   conn->receive(ec);
   if (ec == error::sync_receive_push_failed) {
      ec = {};
      co_await conn->async_receive(asio::redirect_error(asio::use_awaitable, ec));
   }

   if (ec)
      break;  // Connection lost, break so we can reconnect to channels.

   ...

   consume_one(resp);
}

The async_receive function does not actually receive a response, it just removes its size from the channel. However it does not make sense to consume only one message at time since the response passed to conn.set_receive_response() might contain multiple pushes (i.e. the channel size). If we can make async_receive consume/accumulate all elements in the channel, the loop gets simpler and there is no need for the consume_one function that is inefficient since it rotates the response (any performance gain made by generic_flat_response will be eclipsed by these rotations). The general form of the loop consuming pushes would become

for (error_code ec;;) {
   // Accumulates all sizes in the channel.
   co_await conn->async_receive2(asio::redirect_error(asio::use_awaitable, ec));
   if (ec)
      break;  // Connection lost, break so we can reconnect to channels.

   for (auto const& elem: resp)
      // Process the node.

   resp.clear();
}

Things to do

  • Add an async_receive2 function that consumes all messages in the channel.
  • Deprecate async_receive, receive and consume_one.

mzimbres avatar Oct 13 '25 20:10 mzimbres