brpc icon indicating copy to clipboard operation
brpc copied to clipboard

滑动窗口bug:Fail to ibv_post_send: Cannot allocate memory, window=3, sq_current=98

Open sunce4t opened this issue 2 months ago • 43 comments

Describe the bug 此错误代表:滑动窗口表示可发送,但RDMA的SQ(Send Queue)没有可用的空间发送请求;

滑动窗口代表发送方可继续发送的能力;当前,滑动窗口的更新时机为:poll到带有imm(也就是带有acks)的recv cqe,并且进行对应的处理时; RDMA的SQ是一个环形buffer,其上存放发送但未完成的请求,SQ的指针更新时机为 poll 到 send cqe时; 这里我们可以发现:SQ和滑动窗口的更新时机不一致;

按照正常逻辑来讲,如果处理到带有imm的recv cqe,那么代表此前发送方所发出的RDMA Send已经完成,在recv cqe之前应该先生成send cqe;

如下图示意:

这是brpc期望的一个逻辑,在更新滑动窗口值前,先更新了SQ;

Image

但遇到该问题时,说明情况是这样的: 发送方调用大量RDMA Send操作,占用完SQ; 接着发送方poll到带有imm的recv cqe(并且此时一定没有其他send cqe,如果有的话,按照brpc一次poll 32个cqe,也会触发SQ指针更新),进入处理逻辑更新滑动窗口值; 更新窗口值后,某个线程发现窗口值非0,尝试发送,但SQ没有可用空间存放请求了,于是报错;

为什么在带有imm的recv cqe之前没有send cqe呢? 根据brpc的逻辑,带有imm就代表接收方收到数据(意味着RDMA Send已经完成),并向发送方发送确认; 而此时发送方已经生成带有imm的recv cqe,那么为什么已经完成的RDMA Send还没生成send cqe?

查看了IB spec,不同WQ共享CQ他们生成cqe是不能确保顺序的,只能确保同一个WQ内部的cqe生成顺序:

Image

所以,brpc的SQ和RQ共享了一个CQ后,不能确保发送方收到带有imm的recv cqe之前,一定生成send cqe;

可以看如下示意图:

Image

因此我认为此问题由两个原因共同导致: 1.滑动窗口更新时机和RDMA SQ更新时机不一致; 2.cqe生成顺序的不确定性。

To Reproduce 由业务触发,本地测试程序没有复现成功。不过发现日志中报错时业务均在发送大量数据。

Expected behavior 我们期望的是:滑动窗口更新时机与RDMA SQ的更新时机保持一致; 目前我们实现了一个patch,保证滑动窗口更新时机一定在RDMA SQ的更新时机之后。(确保RDMA SQ有空间再更新滑动窗口) 我们新增了_remote_acks记录接收方确认的数目;因为同一个WQ内是保证顺序的,我们使用_sq_to_update队列记录unsignaled的数目,使用_sq_update_flag队列记录已经poll到的send cqe数目; 当处理带有imm的recv cqe时,判断此前是否poll到过send cqe,若没有poll到,仅将此次的acks累加到_remote_acks中,不进行窗口值更新。

分析: 此patch不影响正常逻辑(即先生成send cqe,后生成recv cqe),这种情况在此patch下没有任何区别; 当先生成recv cqe,后生成send cqe时,此patch强制要求窗口值更新在send cqe后,保证SQ不会溢出。

diff --git a/src/brpc/rdma/rdma_endpoint.cpp b/src/brpc/rdma/rdma_endpoint.cpp
index 1d502a98..dac82bb7 100644
--- a/src/brpc/rdma/rdma_endpoint.cpp
+++ b/src/brpc/rdma/rdma_endpoint.cpp

@@ -101,7 +114,7 @@ static const uint32_t ACK_MSG_RDMA_OK = 0x1;

 static butil::Mutex* g_rdma_resource_mutex = NULL;
 static RdmaResource* g_rdma_resource_list = NULL;
-
+static butil::atomic<uint64_t> g_wr_id(0);
 struct HelloMessage {
     void Serialize(void* data) const;
     void Deserialize(void* data);
@@ -191,7 +204,11 @@ RdmaEndpoint::RdmaEndpoint(Socket* s)
     , _remote_window_capacity(0)
     , _window_size(0)
     , _new_rq_wrs(0)
+    , _remote_acks(0)
+    , _m_sq_unsignaled(0)
 {
+    LOG(INFO) << "_remote_acks: " << _remote_acks.load();
+    LOG(INFO) << "_m_sq_unsignaled: " << _m_sq_unsignaled;
     if (_sq_size < MIN_QP_SIZE) {
         _sq_size = MIN_QP_SIZE;
     }
@@ -208,6 +225,11 @@ RdmaEndpoint::RdmaEndpoint(Socket* s)
 }

 RdmaEndpoint::~RdmaEndpoint() {
+    LOG(INFO) << _window_size << " " << _remote_acks << " " << _sq_update_flag.size() << " " << _sq_to_update.size() << " " << _sq_unsignaled << " " << _sq_unsignaled;
+    while(_sq_to_update.empty() == false) {
+        LOG(INFO) << _sq_to_update.front();
+       _sq_to_update.pop();
+    }
     Reset();
     bthread::butex_destroy(_read_butex);
 }
@@ -231,6 +253,8 @@ void RdmaEndpoint::Reset() {
     _new_rq_wrs = 0;
     _sq_sent = 0;
     _rq_received = 0;
+    _remote_acks.store(0, butil::memory_order_relaxed);
+    _m_sq_unsignaled.store(0, butil::memory_order_relaxed);
 }

 void RdmaConnect::StartConnect(const Socket* socket,
@@ -878,15 +902,36 @@ ssize_t RdmaEndpoint::CutFromIOBufList(butil::IOBuf** from, size_t ndata) {
         }

         ibv_send_wr* bad = NULL;
-        int err = ibv_post_send(_resource->qp, &wr, &bad);
-        if (err != 0) {
-            // We use other way to guarantee the Send Queue is not full.
-            // So we just consider this error as an unrecoverable error.
-            LOG(WARNING) << "Fail to ibv_post_send: " << berror(err)
+
+        wr.wr_id = g_wr_id.fetch_add(1, butil::memory_order_relaxed);
+        {
+            BAIDU_SCOPED_LOCK(_m_sq_mutex);
+            int err = ibv_post_send(_resource->qp, &wr, &bad);
+            if (err != 0) {
+                // We use other way to guarantee the Send Queue is not full.
+                // So we just consider this error as an unrecoverable error.
+                LOG(WARNING) << "Fail to ibv_post_send: " << berror(err)
                          << ", window=" << window
                          << ", sq_current=" << _sq_current;
                 errno = err;
                 return -1;
+            }
+            _m_sq_unsignaled.fetch_add(1, butil::memory_order_release);
+            uint16_t cur_unsignaled = 0;
+            if(wr.send_flags & IBV_SEND_SIGNALED) {
+                cur_unsignaled = _m_sq_unsignaled.exchange(0, butil::memory_order_acquire);
+            }
+            if(cur_unsignaled != 0) {
+                BAIDU_SCOPED_LOCK(_sq_update_mutex);
+                _sq_to_update.push(cur_unsignaled);
+               //LOG(INFO) << "send signaled before: " << cur_unsignaled;
+            }
         }

         ++_sq_current;
@@ -924,13 +969,26 @@ int RdmaEndpoint::SendImm(uint32_t imm) {
     wr.send_flags |= IBV_SEND_SIGNALED;

     ibv_send_wr* bad = NULL;
-    int err = ibv_post_send(_resource->qp, &wr, &bad);
-    if (err != 0) {
-        // We use other way to guarantee the Send Queue is not full.
-        // So we just consider this error as an unrecoverable error.
-        LOG(WARNING) << "Fail to ibv_post_send: " << berror(err);
-        return -1;
+    wr.wr_id = g_wr_id.fetch_add(1, butil::memory_order_relaxed);
+    uint16_t cur_unsignaled = 0;
+    {
+        BAIDU_SCOPED_LOCK(_m_sq_mutex);
+        int err = ibv_post_send(_resource->qp, &wr, &bad);
+        if (err != 0) {
+            // We use other way to guarantee the Send Queue is not full.
+            // So we just consider this error as an unrecoverable error.
+            LOG(WARNING) << "Fail to ibv_post_send: " << berror(err);
+            return -1;
+        }
+        cur_unsignaled = _m_sq_unsignaled.exchange(0, butil::memory_order_relaxed);
+        {
+            BAIDU_SCOPED_LOCK(_sq_update_mutex);
+            _sq_to_update.push(cur_unsignaled);
+            //LOG(INFO) << "wr id: " << wr.wr_id <<  " SendImm before: " << cur_unsignaled;
+        }
+
     }
+
     return 0;
 }
@@ -938,8 +996,11 @@ ssize_t RdmaEndpoint::HandleCompletion(ibv_wc& wc) {
     bool zerocopy = FLAGS_rdma_recv_zerocopy;
     switch (wc.opcode) {
     case IBV_WC_SEND: {  // send completion
-        // Do nothing
-        break;
+        {
+            BAIDU_SCOPED_LOCK(_sq_flag_mutex);
+           _sq_update_flag.push(true);
+       }
+       break;
     }
     case IBV_WC_RECV: {  // recv completion
         // Please note that only the first wc.byte_len bytes is valid
@@ -959,24 +1020,66 @@ ssize_t RdmaEndpoint::HandleCompletion(ibv_wc& wc) {
         }
         if (wc.imm_data > 0) {
             // Clear sbuf here because we ignore event wakeup for send completions
-            uint32_t acks = butil::NetToHost32(wc.imm_data);
-            uint32_t num = acks;
-            while (num > 0) {
-                _sbuf[_sq_sent++].clear();
-                if (_sq_sent == _sq_size - RESERVED_WR_NUM) {
-                    _sq_sent = 0;
-                }
-                --num;
-            }
-            butil::subtle::MemoryBarrier();

             // Update window
-            uint32_t wnd_thresh = _local_window_capacity / 8;
-            if (_window_size.fetch_add(acks, butil::memory_order_relaxed) >= wnd_thresh
-                    || acks >= wnd_thresh) {
+            //uint32_t wnd_thresh = _local_window_capacity / 8;
+            //if (_window_size.fetch_add(acks, butil::memory_order_relaxed) >= wnd_thresh
+            //        || acks >= wnd_thresh) {
                 // Do not wake up writing thread right after _window_size > 0.
                 // Otherwise the writing thread may switch to background too quickly.
-                _socket->WakeAsEpollOut();
+            //    _socket->WakeAsEpollOut();
+            //}
+           uint32_t acks = butil::NetToHost32(wc.imm_data);
+           //LOG(INFO) << "acks: " << acks;
+            _remote_acks.fetch_add(acks, butil::memory_order_relaxed);
+            {
+                BAIDU_SCOPED_LOCK(_sq_update_mutex);
+                while(_sq_to_update.empty() == false &&  _remote_acks.load() >= _sq_to_update.front()) {
+                    {
+                        BAIDU_SCOPED_LOCK(_sq_flag_mutex);
+                        if(_sq_update_flag.empty() == true) {
+                            break;
+                        }
+                        _sq_update_flag.pop();
+                    }
+                    uint32_t wnd_to_update = _sq_to_update.front();
+                    _sq_to_update.pop();
+                    _remote_acks.fetch_sub(wnd_to_update, butil::memory_order_relaxed);
+
+                    //LOG(INFO)  << wnd_to_update << " " <<   _remote_acks << " " << _sq_update_flag.size() << " " << _sq_to_update.size();
+                   uint32_t num = wnd_to_update;
+                    while(num > 0) {
+                        _sbuf[_sq_sent++].clear();
+                        if (_sq_sent == _sq_size - RESERVED_WR_NUM) {
+                            _sq_sent = 0;
+                        }
+                        --num;
+                    }
+                    butil::subtle::MemoryBarrier();
+                    uint32_t wnd_thresh = _local_window_capacity / 8;
+                    if (_window_size.fetch_add(wnd_to_update, butil::memory_order_relaxed) >= wnd_thresh
+                            || acks >= wnd_thresh) {
+            // Do not wake up writing thread right after _window_size > 0.
+            // Otherwise the writing thread may switch to background too quickly.
+                                _socket->WakeAsEpollOut();
+                    }
+                }
             }
         }
diff --git a/src/brpc/rdma/rdma_endpoint.h b/src/brpc/rdma/rdma_endpoint.h
index de7cd5f6..114eb682 100644
--- a/src/brpc/rdma/rdma_endpoint.h
+++ b/src/brpc/rdma/rdma_endpoint.h
@@ -31,7 +31,7 @@
 #include "butil/containers/mpsc_queue.h"
 #include "brpc/socket.h"

-
+#include <queue>
 namespace brpc {
 class Socket;
 namespace rdma {
@@ -262,6 +262,13 @@ private:
     // The number of new WRs posted in the local Recv Queue
     butil::atomic<uint16_t> _new_rq_wrs;

+    butil::atomic<int> _remote_acks;
+    butil::Mutex _m_sq_mutex;
+    butil::Mutex _sq_update_mutex;
+    butil::Mutex _sq_flag_mutex;
+    butil::atomic<uint16_t> _m_sq_unsignaled;
+    std::queue<uint16_t> _sq_to_update;
+    std::queue<bool> _sq_update_flag;
     // butex for inform read events on TCP fd during handshake
     butil::atomic<int> *_read_butex;

** Versions OS: Compiler: brpc: protobuf:

Additional context/screenshots

sunce4t avatar Oct 28 '25 03:10 sunce4t

把window拆成send window和recv window不就好了吗, 每次发送前比较这两者的最小值是否大于0, send wc则根据unsignaled的比例来更新send window, 你这样不是还加了几个锁和队列, 拆window只需要多做一次relaxed order的原子操作即可

Yangfisher1 avatar Oct 30 '25 02:10 Yangfisher1

把window拆成send window和recv window不就好了吗, 每次发送前比较这两者的最小值是否大于0, send wc则根据unsignaled的比例来更新send window, 你这样不是还加了几个锁和队列, 拆window只需要多做一次relaxed order的原子操作即可

hi,加锁和队列是因为 SendImm也会signaled,SendImm的触发时机不能保证,例如:可能当前有10个unsignaled,刚好触发一次SendImm 所以我加了锁和队列记录每一次ibv_post_send前有多少个unsignaled请求

最开始我们有尝试在处理send cqe的逻辑处按比例更新你提到的send window,后面发现不能确定每个send cqe前有多少个unsignaled的请求。

拆成两个window确实更好,如果要精准更新send window,就得保证ibv_post_send的顺序跟记录unsignaled的顺序一致,这一部还得加锁吧。 不过这样只需要加一个锁、一个队列和一个原子变量。

sunce4t avatar Oct 30 '25 03:10 sunce4t

我明白了, 那其实最简单的一个方案是SendImm改为unsignaled不可以嘛, 因为我理解SendImm成功状态下的wc本身没有什么用, 而失败的wc本身即使是unsignaled的状态也是会产生的, 失败情况下可以视为整个qp状态已经不对了

Yangfisher1 avatar Oct 30 '25 03:10 Yangfisher1

我明白了, 那其实最简单的一个方案是SendImm改为unsignaled不可以嘛, 因为我理解SendImm成功状态下的wc本身没有什么用, 而失败的wc本身即使是unsignaled的状态也是会产生的, 失败情况下可以视为整个qp状态已经不对了

这个可以的,如果按照你说的改成unsignaled,只需要一个记录接收窗口的原子变量就行了。

sunce4t avatar Oct 30 '25 03:10 sunce4t

是的, 这样可以大大简化实现

Yangfisher1 avatar Oct 30 '25 03:10 Yangfisher1

是的, 这样可以大大简化实现

目前我们改了一下,没有去掉SendImm的IBV_SEND_SIGNALED,我下面这个版本仍旧用锁和队列保证send cqe和记录的unsignaled num的顺序一致; 如果去掉,就只需要一个_remote_recv_window了;

diff --git a/src/brpc/rdma/rdma_endpoint.cpp b/src/brpc/rdma/rdma_endpoint.cpp
index f3bc82d7..159bce90 100644
--- a/src/brpc/rdma/rdma_endpoint.cpp
+++ b/src/brpc/rdma/rdma_endpoint.cpp
@@ -101,6 +101,7 @@ static const uint32_t ACK_MSG_RDMA_OK = 0x1;

 static butil::Mutex* g_rdma_resource_mutex = NULL;
 static RdmaResource* g_rdma_resource_list = NULL;
+static butil::atomic<uint64_t> g_wr_id(0);

 struct HelloMessage {
     void Serialize(void* data) const;

@@ -514,6 +533,7 @@ void* RdmaEndpoint::ProcessHandshakeAtClient(void* arg) {
         ep->_remote_window_capacity =
             std::min(ep->_rq_size, remote_msg.sq_size) - RESERVED_WR_NUM,
         ep->_window_size.store(ep->_local_window_capacity, butil::memory_order_relaxed);
+        ep->_remote_recv_window.store(ep->_remote_window_capacity, butil::memory_order_relaxed);

         ep->_state = C_BRINGUP_QP;
         if (ep->BringUpQp(remote_msg.lid, remote_msg.gid, remote_msg.qp_num) < 0) {
@@ -622,7 +642,7 @@ void* RdmaEndpoint::ProcessHandshakeAtServer(void* arg) {
         ep->_remote_window_capacity =
             std::min(ep->_rq_size, remote_msg.sq_size) - RESERVED_WR_NUM,
         ep->_window_size.store(ep->_local_window_capacity, butil::memory_order_relaxed);
-
+        ep->_remote_recv_window.store(ep->_remote_window_capacity, butil::memory_order_relaxed);
         ep->_state = S_ALLOC_QPCQ;
         if (ep->AllocateResources() < 0) {
             LOG(WARNING) << "Fail to allocate rdma resources, fallback to tcp:"

@@ -872,30 +873,37 @@ ssize_t RdmaEndpoint::CutFromIOBufList(butil::IOBuf** from, size_t ndata) {

         // Avoid too much send completion event to reduce the CPU overhead
         ++_sq_unsignaled;
        if (_sq_unsignaled >= _local_window_capacity / 4) {
             // Refer to:
             // http::www.rdmamojo.com/2014/06/30/working-unsignaled-completions/
             wr.send_flags |= IBV_SEND_SIGNALED;
             _sq_unsignaled = 0;
         }

         ibv_send_wr* bad = NULL;
-        int err = ibv_post_send(_resource->qp, &wr, &bad);
-        if (err != 0) {
-            // We use other way to guarantee the Send Queue is not full.
-            // So we just consider this error as an unrecoverable error.
-            LOG(WARNING) << "Fail to ibv_post_send: " << berror(err)
+       wr.wr_id = g_wr_id.fetch_add(1, butil::memory_order_relaxed);
+       {
+           BAIDU_SCOPED_LOCK(_m_sq_mutex);
+            int err = ibv_post_send(_resource->qp, &wr, &bad);
+            if (err != 0) {
+                // We use other way to guarantee the Send Queue is not full.
+                // So we just consider this error as an unrecoverable error.
+                LOG(WARNING) << "Fail to ibv_post_send: " << berror(err)
                          << ", window=" << window
                          << ", sq_current=" << _sq_current;
-            errno = err;
-            return -1;
-        }
+                errno = err;
+                return -1;
+            }
+           _m_sq_unsignaled.fetch_add(1, butil::memory_order_release);
+           uint16_t cur_unsignaled = 0;
+           if(wr.send_flags & IBV_SEND_SIGNALED) {
+               cur_unsignaled = _m_sq_unsignaled.exchange(0, butil::memory_order_acquire);
+           }
+           if(cur_unsignaled != 0) {
+                BAIDU_SCOPED_LOCK(_sq_update_mutex);
+                _sq_to_update.push(cur_unsignaled);
+            }
+       }

         ++_sq_current;
         if (_sq_current == _sq_size - RESERVED_WR_NUM) {
@@ -932,19 +940,27 @@ int RdmaEndpoint::SendImm(uint32_t imm) {
     wr.send_flags |= IBV_SEND_SIGNALED;

     ibv_send_wr* bad = NULL;
-    int err = ibv_post_send(_resource->qp, &wr, &bad);
-    if (err != 0) {
-        // We use other way to guarantee the Send Queue is not full.
-        // So we just consider this error as an unrecoverable error.
-        LOG(WARNING) << "Fail to ibv_post_send: " << berror(err);
-        return -1;
+    wr.wr_id = g_wr_id.fetch_add(1, butil::memory_order_relaxed);
+    uint16_t cur_unsignaled = 0;
+    {
+       BAIDU_SCOPED_LOCK(_m_sq_mutex);
+       int err = ibv_post_send(_resource->qp, &wr, &bad);
+       if (err != 0) {
+            // We use other way to guarantee the Send Queue is not full.
+            // So we just consider this error as an unrecoverable error.
+            LOG(WARNING) << "Fail to ibv_post_send: " << berror(err);
+            return -1;
+        }
+       cur_unsignaled = _m_sq_unsignaled.exchange(0, butil::memory_order_relaxed);
     }
+
+
+
+     {
+         BAIDU_SCOPED_LOCK(_sq_update_mutex);
+        _sq_to_update.push(cur_unsignaled);
+       //LOG(INFO) << "SendImm before: " << cur_unsignaled;
     }

     return 0;
 }
@@ -938,7 +987,32 @@ ssize_t RdmaEndpoint::HandleCompletion(ibv_wc& wc) {
     bool zerocopy = FLAGS_rdma_recv_zerocopy;
     switch (wc.opcode) {
     case IBV_WC_SEND: {  // send completion
-        // Do nothing
+       uint16_t wnd_to_update = 0;
+       {
+           BAIDU_SCOPED_LOCK(_sq_update_mutex);
+           wnd_to_update = _sq_to_update.front();
+           _sq_to_update.pop();
+       }
+       if (wnd_to_update != 0) {
+           uint32_t num = wnd_to_update;
+            while(num > 0) {
+                _sbuf[_sq_sent++].clear();
+                if (_sq_sent == _sq_size - RESERVED_WR_NUM) {
+                   _sq_sent = 0;
+                }
+                --num;
+            }
+           butil::subtle::MemoryBarrier();
+           uint32_t wnd_thresh = _local_window_capacity / 8;
+           if ((_window_size.fetch_add(wnd_to_update, butil::memory_order_relaxed) >= wnd_thresh)) {
+            // Do not wake up writing thread right after _window_size > 0.
+           // Otherwise the writing thread may switch to background too quickly.
+               _socket->WakeAsEpollOut();
+           }
+
+         }
+        //LOG(INFO) << _window_size << " " << _remote_recv_window;
+                             // Do nothing
         break;
     }
     case IBV_WC_RECV: {  // recv completion
@@ -958,26 +1032,9 @@ ssize_t RdmaEndpoint::HandleCompletion(ibv_wc& wc) {
             }
         }
         if (wc.imm_data > 0) {
-            // Clear sbuf here because we ignore event wakeup for send completions
-            uint32_t acks = butil::NetToHost32(wc.imm_data);
-            uint32_t num = acks;
-            while (num > 0) {
-                _sbuf[_sq_sent++].clear();
-                if (_sq_sent == _sq_size - RESERVED_WR_NUM) {
-                    _sq_sent = 0;
-                }
-                --num;
-            }
-            butil::subtle::MemoryBarrier();
-
-            // Update window
-            uint32_t wnd_thresh = _local_window_capacity / 8;
-            if (_window_size.fetch_add(acks, butil::memory_order_relaxed) >= wnd_thresh
-                    || acks >= wnd_thresh) {
-                // Do not wake up writing thread right after _window_size > 0.
-                // Otherwise the writing thread may switch to background too quickly.
-                _socket->WakeAsEpollOut();
-            }
+           uint32_t acks = butil::NetToHost32(wc.imm_data);
+           _remote_recv_window.fetch_add(acks, butil::memory_order_relaxed);
+
         }
         // We must re-post recv WR
         if (PostRecv(1, zerocopy) < 0) {
@@ -1500,7 +1516,7 @@ void RdmaEndpoint::PollCq(Socket* m) {

             if (wc[i].status != IBV_WC_SUCCESS) {
                 PLOG(WARNING) << "Fail to handle RDMA completion, error status("
-                              << wc[i].status << "): " << s->description();
+                              << wc[i].status << " " << wc[i].opcode << " " << wc[i].wr_id << "): " << s->description();
                 s->SetFailed(ERDMA, "RDMA completion error(%d) from %s: %s",
                              wc[i].status, s->description().c_str(), berror(ERDMA));
                 continue;
diff --git a/src/brpc/rdma/rdma_endpoint.h b/src/brpc/rdma/rdma_endpoint.h
index de7cd5f6..d4920180 100644
--- a/src/brpc/rdma/rdma_endpoint.h
+++ b/src/brpc/rdma/rdma_endpoint.h
@@ -31,7 +31,7 @@
 #include "butil/containers/mpsc_queue.h"
 #include "brpc/socket.h"

-
+#include <queue>
 namespace brpc {
 class Socket;
 namespace rdma {
@@ -262,6 +269,11 @@ private:
     // The number of new WRs posted in the local Recv Queue
     butil::atomic<uint16_t> _new_rq_wrs;

+    butil::atomic<int> _remote_recv_window;
+    butil::Mutex _m_sq_mutex;
+    butil::Mutex _sq_update_mutex;
+    butil::atomic<uint16_t> _m_sq_unsignaled;
+    std::queue<uint16_t> _sq_to_update;
     // butex for inform read events on TCP fd during handshake
     butil::atomic<int> *_read_butex;

sunce4t avatar Oct 30 '25 04:10 sunce4t

是的, 这样可以大大简化实现

@Yangfisher1 @yanglimingcn 大佬们看看这个版本, _socket->WakeAsEpollOut(); 这一句不确定该放在哪里,暂时先放在case IBV_WR_SEND中

diff --git a/src/brpc/rdma/rdma_endpoint.cpp b/src/brpc/rdma/rdma_endpoint.cpp
index 1d502a98..99ef1159 100644
--- a/src/brpc/rdma/rdma_endpoint.cpp
+++ b/src/brpc/rdma/rdma_endpoint.cpp
@@ -191,6 +191,7 @@ RdmaEndpoint::RdmaEndpoint(Socket* s)
     , _remote_window_capacity(0)
     , _window_size(0)
     , _new_rq_wrs(0)
+    , _remote_recv_window(0)
 {
     if (_sq_size < MIN_QP_SIZE) {
         _sq_size = MIN_QP_SIZE;
@@ -208,6 +209,7 @@ RdmaEndpoint::RdmaEndpoint(Socket* s)
 }

 RdmaEndpoint::~RdmaEndpoint() {
+    LOG(INFO) << _window_size << " " << _remote_recv_window << " " <<  _sq_unsignaled;
     Reset();
     bthread::butex_destroy(_read_butex);
 }
@@ -231,6 +233,7 @@ void RdmaEndpoint::Reset() {
     _new_rq_wrs = 0;
     _sq_sent = 0;
     _rq_received = 0;
+    _remote_recv_window.store(0, butil::memory_order_relaxed);
 }

 void RdmaConnect::StartConnect(const Socket* socket,
@@ -514,7 +517,7 @@ void* RdmaEndpoint::ProcessHandshakeAtClient(void* arg) {
         ep->_remote_window_capacity =
             std::min(ep->_rq_size, remote_msg.sq_size) - RESERVED_WR_NUM,
         ep->_window_size.store(ep->_local_window_capacity, butil::memory_order_relaxed);
-
+        ep->_remote_recv_window.store(ep->_remote_window_capacity, butil::memory_order_relaxed);
         ep->_state = C_BRINGUP_QP;
         if (ep->BringUpQp(remote_msg.lid, remote_msg.gid, remote_msg.qp_num) < 0) {
             LOG(WARNING) << "Fail to bringup QP, fallback to tcp:" << s->description();
@@ -622,7 +625,7 @@ void* RdmaEndpoint::ProcessHandshakeAtServer(void* arg) {
         ep->_remote_window_capacity =
             std::min(ep->_rq_size, remote_msg.sq_size) - RESERVED_WR_NUM,
         ep->_window_size.store(ep->_local_window_capacity, butil::memory_order_relaxed);
-
+        ep->_remote_recv_window.store(ep->_remote_window_capacity, butil::memory_order_relaxed);
         ep->_state = S_ALLOC_QPCQ;
         if (ep->AllocateResources() < 0) {
             LOG(WARNING) << "Fail to allocate rdma resources, fallback to tcp:"
@@ -787,12 +790,14 @@ ssize_t RdmaEndpoint::CutFromIOBufList(butil::IOBuf** from, size_t ndata) {
     size_t total_len = 0;
     size_t current = 0;
     uint32_t window = 0;
+    uint32_t recv_window = 0;
     ibv_send_wr wr;
     int max_sge = GetRdmaMaxSge();
     ibv_sge sglist[max_sge];
     while (current < ndata) {
-        window = _window_size.load(butil::memory_order_relaxed);
-        if (window == 0) {
+        window = _window_size.load(butil::memory_order_acquire);
+       recv_window = _remote_recv_window.load(butil::memory_order_acquire);
+        if (window == 0 || recv_window == 0) {
             if (total_len > 0) {
                 break;
             } else {
@@ -898,7 +903,8 @@ ssize_t RdmaEndpoint::CutFromIOBufList(butil::IOBuf** from, size_t ndata) {
         // Because there is at most one thread can enter this function for each
         // Socket, and the other thread of HandleCompletion can only add this
         // counter.
-        _window_size.fetch_sub(1, butil::memory_order_relaxed);
+        _window_size.fetch_sub(1, butil::memory_order_release);
+       _remote_recv_window.fetch_sub(1, butil::memory_order_release);
     }

     return total_len;
@@ -921,7 +927,7 @@ int RdmaEndpoint::SendImm(uint32_t imm) {
     wr.opcode = IBV_WR_SEND_WITH_IMM;
     wr.imm_data = butil::HostToNet32(imm);
     wr.send_flags |= IBV_SEND_SOLICITED;
-    wr.send_flags |= IBV_SEND_SIGNALED;
+    //wr.send_flags |= IBV_SEND_SIGNALED;

     ibv_send_wr* bad = NULL;
     int err = ibv_post_send(_resource->qp, &wr, &bad);
@@ -938,7 +944,23 @@ ssize_t RdmaEndpoint::HandleCompletion(ibv_wc& wc) {
     bool zerocopy = FLAGS_rdma_recv_zerocopy;
     switch (wc.opcode) {
     case IBV_WC_SEND: {  // send completion
-        // Do nothing
+        uint16_t wnd_to_update = _local_window_capacity / 4;
+       uint32_t num = wnd_to_update;
+       while(num > 0) {
+            _sbuf[_sq_sent++].clear();
+             if (_sq_sent == _sq_size - RESERVED_WR_NUM) {
+                _sq_sent = 0;
+             }
+             --num;
+       }
+       butil::subtle::MemoryBarrier();
+        uint32_t wnd_thresh = _local_window_capacity / 8;
+        _window_size.fetch_add(wnd_to_update, butil::memory_order_release);
+        //if ((_remote_recv_window.load(butil::memory_order_relaxed) >= wnd_thresh)) {
+             // Do not wake up writing thread right after _window_size > 0.
+            // Otherwise the writing thread may switch to background too quickly.
+            _socket->WakeAsEpollOut();
+        //}
         break;
     }
     case IBV_WC_RECV: {  // recv completion
@@ -958,27 +980,9 @@ ssize_t RdmaEndpoint::HandleCompletion(ibv_wc& wc) {
             }
         }
         if (wc.imm_data > 0) {
-            // Clear sbuf here because we ignore event wakeup for send completions
             uint32_t acks = butil::NetToHost32(wc.imm_data);
-            uint32_t num = acks;
-            while (num > 0) {
-                _sbuf[_sq_sent++].clear();
-                if (_sq_sent == _sq_size - RESERVED_WR_NUM) {
-                    _sq_sent = 0;
-                }
-                --num;
-            }
-            butil::subtle::MemoryBarrier();
-
-            // Update window
-            uint32_t wnd_thresh = _local_window_capacity / 8;
-            if (_window_size.fetch_add(acks, butil::memory_order_relaxed) >= wnd_thresh
-                    || acks >= wnd_thresh) {
-                // Do not wake up writing thread right after _window_size > 0.
-                // Otherwise the writing thread may switch to background too quickly.
-                _socket->WakeAsEpollOut();
-            }
-        }
+           _remote_recv_window.fetch_add(acks, butil::memory_order_release);
+       }
         // We must re-post recv WR
         if (PostRecv(1, zerocopy) < 0) {
             return -1;
diff --git a/src/brpc/rdma/rdma_endpoint.h b/src/brpc/rdma/rdma_endpoint.h
index de7cd5f6..8cbaf710 100644
--- a/src/brpc/rdma/rdma_endpoint.h
+++ b/src/brpc/rdma/rdma_endpoint.h
@@ -262,6 +262,7 @@ private:
     // The number of new WRs posted in the local Recv Queue
     butil::atomic<uint16_t> _new_rq_wrs;

+    butil::atomic<uint16_t> _remote_recv_window;
     // butex for inform read events on TCP fd during handshake
     butil::atomic<int> *_read_butex;
@@ -752,7 +755,7 @@ bool RdmaEndpoint::IsWritable() const {
         return false;
     }

-    return _window_size.load(butil::memory_order_relaxed) > 0;
+    return _window_size.load(butil::memory_order_relaxed) > 0 && _remote_recv_window.load(butil::memory_order_relaxed) > 0;
 }

sunce4t avatar Oct 30 '25 09:10 sunce4t

_socket->WakeAsEpollOut()我理解应该两个window更新的时候都要放, 当两个window的最小值超过window threshold的时候调用WakeAsEpollOut来通知上层可以继续写了

Yangfisher1 avatar Oct 31 '25 07:10 Yangfisher1

问题总结,因为在发送端产生的recv completion event和send completion event的时机不一致,导致接收端产生回复ACK的时候,send completion event可能还没有产生,所以申请sqe的时候就可能失败。 可以让recv completion event和send completion event的产生时机是一致的,这样能保证接收端回复ACK的时候,发送端一定有send completion event产生。具体修改:

Image

yanglimingcn avatar Oct 31 '25 08:10 yanglimingcn

_socket->WakeAsEpollOut()我理解应该两个window更新的时候都要放, 当两个window的最小值超过window threshold的时候调用WakeAsEpollOut来通知上层可以继续写了

是的,后面我们也加了一下,还在测试,另外前面那版原本少改了这个:

@@ -752,7 +755,7 @@ bool RdmaEndpoint::IsWritable() const {
         return false;
     }

-    return _window_size.load(butil::memory_order_relaxed) > 0;
+    return _window_size.load(butil::memory_order_relaxed) > 0 && _remote_recv_window.load(butil::memory_order_relaxed) > 0;
 }

sunce4t avatar Oct 31 '25 08:10 sunce4t

可以让recv completion event和send completion event的产生时机是一致的,这样能保证接收端回复ACK的时候,发送端一定有send completion event产生

@yanglimingcn 这不对吧, 这个问题在于client通过recv收到server发来的ack(无论是piggyback的亦或是SendImm专门发的)时候, client的send queue还没有signaled wc更新(这个wc还在后面), 这里修改成solicited也带signaled, 只是server发ack的时候server这一侧会产生一个signaled send wc来更新server侧的send queue, 跟client没关系

Yangfisher1 avatar Oct 31 '25 09:10 Yangfisher1

这个是client端产生的signaled send wc

yanglimingcn avatar Oct 31 '25 11:10 yanglimingcn

我明白你的意思了, 这样也是ok的

Yangfisher1 avatar Oct 31 '25 12:10 Yangfisher1

问题总结,因为在发送端产生的recv completion event和send completion event的时机不一致,导致接收端产生回复ACK的时候,send completion event可能还没有产生,所以申请sqe的时候就可能失败。 可以让recv completion event和send completion event的产生时机是一致的,这样能保证接收端回复ACK的时候,发送端一定有send completion event产生。

@yanglimingcn solicited && signaled send wc完成之前的send wc被server poll到之后ack,client收到的recv cqe,排在signaled send cqe前面,还是有问题吧?server使用polling模式时,这种情况很容易出现吧。

chenBright avatar Nov 04 '25 07:11 chenBright

我提了一个PR,你有时间看看 @chenBright

yanglimingcn avatar Nov 04 '25 11:11 yanglimingcn

https://github.com/apache/brpc/pull/3145 我们之前将SendImm的IBV_SEND_SIGNALED注释掉后发现也会有问题,我们也采取了类似这个pr的方法,用特定的wr_id区分imm的send cqe,并且没有任何问题; 因此后续这个bug的修复请参考这个pr。

sunce4t avatar Nov 11 '25 03:11 sunce4t

滑动窗口失效的原因应该不是共享 CQ 所致, 而是滑动窗口机制依赖的一个非常关键的前置条件并不成立, 就是本端应用层收到了对端应用层的 ACK 就认为底层 SQ 都释放掉了。 在无损的 IB 网络, 这个条件可能成立。 而对于基于以太网的 RoCEv2, 这个是不保证的。 底层驱动发送完一个消息后,只有收到对端驱动的 ACK 才会释放 SQ。 当网络压力大的时候, 存在一种情况, 对端收到了消息然后对端应用层也处理了该消息, 但是对端驱动层回复的 ACK 可能丢了或者超时了; 而本端应用层收到了对端应用层发送的 ACK, 累加窗口, 实际上本端驱动层因为接收 ACK 超时导致重发消息, 并未释放 SQ。

legionxiong avatar Nov 18 '25 04:11 legionxiong

滑动窗口失效的原因应该不是共享 CQ 所致, 而是滑动窗口机制依赖一个前置条件并不成立, 就是本端应用层收到了对端应用层的 ACK 就认为底层 SQ 都释放掉了。 在无损的 IB 网络, 这个条件可能成立。 而对于基于以太网的 RoCEv2, 这个是不保证的。 底层驱动发送一个消息后,只有收到对端驱动的 ACK 才会释放 SQ。 当网络压力大的时候, 存在一种情况, 对端收到了消息然后对端应用层也处理了消息, 但是对端驱动层回复的 ACK 丢了或者超时了; 而本端应用层收到了对端应用层发送的 ACK, 累加窗口, 实际上本端驱动层因为接收 ACK 超时导致重发消息, 并未释放 SQ。

@legionxiong hi,这么理解对吗:就是驱动层ACK丢失了进行重传,导致应用先看见recv cqe去累加窗口,后续再看见send cqe。我理解的话,驱动层ACK是影响cqe的生成,不能直接影响资源的释放。(例如:SQ的释放取决于send cqe从CQ中何时poll下来)

sunce4t avatar Nov 18 '25 04:11 sunce4t

滑动窗口失效的原因应该不是共享 CQ 所致, 而是滑动窗口机制依赖一个前置条件并不成立, 就是本端应用层收到了对端应用层的 ACK 就认为底层 SQ 都释放掉了。 在无损的 IB 网络, 这个条件可能成立。 而对于基于以太网的 RoCEv2, 这个是不保证的。 底层驱动发送一个消息后,只有收到对端驱动的 ACK 才会释放 SQ。 当网络压力大的时候, 存在一种情况, 对端收到了消息然后对端应用层也处理了消息, 但是对端驱动层回复的 ACK 丢了或者超时了; 而本端应用层收到了对端应用层发送的 ACK, 累加窗口, 实际上本端驱动层因为接收 ACK 超时导致重发消息, 并未释放 SQ。

@legionxiong hi,这么理解对吗:就是驱动层ACK丢失了进行重传,导致应用先看见recv cqe去累加窗口,后续再看见send cqe。我理解的话,驱动层ACK是影响cqe的生成,不能直接影响资源的释放。(例如:SQ的释放取决于send cqe从CQ中何时poll下来)

驱动层 ACK 影响的是 WC 的生成, 而 brpc 内有个优化就是 1/4 窗口给 wr 设置一次 IBV_SEND_SIGNALED。 只有设置了 IBV_SEND_SIGNALED 标志的 wr 或者出错了才会给该 wr 生成 WC,只有当应用层 poll 到了 opcode 为 IBV_WC_SEND 的 wr 才可以认为该 wr 及其前面 unsignaled 的 SQ 都被释放掉了。

legionxiong avatar Nov 18 '25 06:11 legionxiong

滑动窗口失效的原因应该不是共享 CQ 所致, 而是滑动窗口机制依赖一个前置条件并不成立, 就是本端应用层收到了对端应用层的 ACK 就认为底层 SQ 都释放掉了。 在无损的 IB 网络, 这个条件可能成立。 而对于基于以太网的 RoCEv2, 这个是不保证的。 底层驱动发送一个消息后,只有收到对端驱动的 ACK 才会释放 SQ。 当网络压力大的时候, 存在一种情况, 对端收到了消息然后对端应用层也处理了消息, 但是对端驱动层回复的 ACK 丢了或者超时了; 而本端应用层收到了对端应用层发送的 ACK, 累加窗口, 实际上本端驱动层因为接收 ACK 超时导致重发消息, 并未释放 SQ。

@legionxiong hi,这么理解对吗:就是驱动层ACK丢失了进行重传,导致应用先看见recv cqe去累加窗口,后续再看见send cqe。我理解的话,驱动层ACK是影响cqe的生成,不能直接影响资源的释放。(例如:SQ的释放取决于send cqe从CQ中何时poll下来)

驱动层 ACK 影响的是 WC 的生成, 而 brpc 内有个优化就是 1/4 窗口给 wr 设置一次 IBV_SEND_SIGNALED。 只有设置了 IBV_SEND_SIGNALED 标志的 wr 或者出错了才会给该 wr 生成 WC,当应用层 poll 到了 opcode 为 IBV_WC_SEND 的 wr 则可以认为该 wr 及其前面 unsignaled 的 SQ 都被释放掉了。

了解,我说的cqe(completion queue element)也就是你指的这个WC(work completion)。那就没问题了。

sunce4t avatar Nov 18 '25 06:11 sunce4t

滑动窗口失效的原因应该不是共享 CQ 所致, 而是滑动窗口机制依赖一个前置条件并不成立, 就是本端应用层收到了对端应用层的 ACK 就认为底层 SQ 都释放掉了。 在无损的 IB 网络, 这个条件可能成立。 而对于基于以太网的 RoCEv2, 这个是不保证的。 底层驱动发送一个消息后,只有收到对端驱动的 ACK 才会释放 SQ。 当网络压力大的时候, 存在一种情况, 对端收到了消息然后对端应用层也处理了消息, 但是对端驱动层回复的 ACK 丢了或者超时了; 而本端应用层收到了对端应用层发送的 ACK, 累加窗口, 实际上本端驱动层因为接收 ACK 超时导致重发消息, 并未释放 SQ。

@legionxiong hi,这么理解对吗:就是驱动层ACK丢失了进行重传,导致应用先看见recv cqe去累加窗口,后续再看见send cqe。我理解的话,驱动层ACK是影响cqe的生成,不能直接影响资源的释放。(例如:SQ的释放取决于send cqe从CQ中何时poll下来)

驱动层 ACK 影响的是 WC 的生成, 而 brpc 内有个优化就是 1/4 窗口给 wr 设置一次 IBV_SEND_SIGNALED。 只有设置了 IBV_SEND_SIGNALED 标志的 wr 或者出错了才会给该 wr 生成 WC,当应用层 poll 到了 opcode 为 IBV_WC_SEND 的 wr 则可以认为该 wr 及其前面 unsignaled 的 SQ 都被释放掉了。

了解,我说的cqe(completion queue element)也就是你指的这个WC(work completion)。那就没问题了。

没错,准确地说是生成 CQE 。

legionxiong avatar Nov 18 '25 06:11 legionxiong

[E1008]Reached timeout=60000ms @Socket{id=13 fd=1160 addr=xxx:xx} (0x0x7f957c964ec0) rdma info={rdma_state=ON, handshake_state=ESTABLISHED, rdma_remote_rq_window_size=63, rdma_sq_window_size=0, rdma_local_window_capacity=125, rdma_remote_window_capacity=125, rdma_sbuf_head=57, rdma_sbuf_tail=120, rdma_rbuf_head=36, rdma_unacked_rq_wr=0, rdma_received_ack=0, rdma_unsolicited_sent=0, rdma_unsignaled_sq_wr=1, rdma_new_rq_wrs=0, }

我们使用rdma_sq_window_size来表示本地SQ大小,遇到这样的case:rdma_sq_window_size是0,似乎是一直没poll到IBV_WC_SEND WC。超时后重试多次也还是超时,此时数据已经发不出去了。

原因应该是:solicited_only=1时,IBV_WC_SEND WC不会生成CQE。如果没有IBV_WC_RECV CEQ触发comp channel事件,就没法poll到IBV_WC_SEND WC,更新rdma_sq_window_size。

我们验证了以下两种方案可以解决这个问题:

  1. solicited_only=0
  2. 将CQ拆成send CQ(solicited_only=0)和recv CQ(solicited_only=1)。

@legionxiong 提到https://github.com/apache/brpc/pull/3145#issuecomment-3544927795 使用一个CQ即可。那在solicited_only=1时,怎么感知IBV_WC_SEND WC的生成呢?

chenBright avatar Nov 18 '25 07:11 chenBright

提到#3145 (comment) 使用一个CQ即可。那在solicited_only=1时,怎么感知IBV_WC_SEND WC的生成呢?

所以在给一个 wr 设置 IBV_SEND_SIGNALED 标志的同时需要设置 IBV_SEND_SOLICITED 标志, solicited_only 的 wr 不需要感知 IBV_WC_SEND。

legionxiong avatar Nov 18 '25 07:11 legionxiong

提到#3145 (comment) 使用一个CQ即可。那在solicited_only=1时,怎么感知IBV_WC_SEND WC的生成呢?

所以在给一个 wr 设置 IBV_SEND_SIGNALED 标志的同时需要设置 IBV_SEND_SOLICITED 标志, solicited_only 的 wr 不需要感知 IBV_WC_SEND。

@legionxiong hi,本端的IBV_SEND_SOLICITED设置后影响的应当是对端是否在comp channel上生成一个event以触发poll_cq,这个问题看起来是本端一直没有event产生,导致没法进入poll_cq流程?

sunce4t avatar Nov 18 '25 07:11 sunce4t

本端的IBV_SEND_SOLICITED设置后影响的应当是对端是否在comp channel上生成一个event以触发poll_cq,这个问题看起来是本端一直没有event产生,导致没法进入poll_cq流程?

是的,问题是本端没有send CQE产生。

Solicited Event文档提到,IBV_SEND_SOLICITED是在对端的comp channel的生成event。我测过,本端是没有event的,所以不会PollCq。

我理解,当前实现是在对端设置了IBV_SEND_SOLICITED后,触发本端进入PollCq,将这之前的的IBV_WC_SEND顺便poll出来。

chenBright avatar Nov 18 '25 07:11 chenBright

本端的IBV_SEND_SOLICITED设置后影响的应当是对端是否在comp channel上生成一个event以触发poll_cq,这个问题看起来是本端一直没有event产生,导致没法进入poll_cq流程?

是的,问题是本端没有send CQE产生。

Solicited Event文档提到,IBV_SEND_SOLICITED是在对端的comp channel的生成event。我测过,本端是没有event的,所以不会PollCq。

当前实现是在对端设置了IBV_SEND_SOLICITED后,触发本端进入PollCq,将这之前的的IBV_WC_SEND顺便poll出来的。

我个人是这样理解的: 假设本端发送请求,对端回复。正常的逻辑: 1.对端在回复的时候是设置IBV_SEND_SOLICITED,本端产生event去poll_cq。 2.poll到send cqe和recv cqe,进行对应窗口更新。 按照 @legionxiong 的说法,send cqe可能由于网络问题延迟生成。 那么在 2 这一步,就会出现应用层先看见recv cqe。这也是之前我们遇到的滑动窗口bug。

关于@chenBright 遇到的问题,是否存在这样的情况: 在本端的sq应用层窗口消耗完前,本端高速发送数据,对端正常的设置IBV_SEND_SOLICITED进行回复,本端也有产生event去触发poll_cq;

由于以太网的问题send cqe一直在延迟生成,所以这个过程中一直没有poll到send cqe(按照目前brpc的默认配置,这个过程可能有四次或者两次event触发poll_cq,但在这个过程中send cqe的生成都被延迟了)。

最后当本端的sq应用层窗口消耗完毕,对端无法再回复,本端也无法再进入poll_cq了;最终导致超时。

sunce4t avatar Nov 18 '25 08:11 sunce4t

提到#3145 (comment) 使用一个CQ即可。那在solicited_only=1时,怎么感知IBV_WC_SEND WC的生成呢?

所以在给一个 wr 设置 IBV_SEND_SIGNALED 标志的同时需要设置 IBV_SEND_SOLICITED 标志, solicited_only 的 wr 不需要感知 IBV_WC_SEND。

@legionxiong hi,本端的IBV_SEND_SOLICITED设置后影响的应当是对端是否在comp channel上生成一个event以触发poll_cq,这个问题看起来是本端一直没有event产生,导致没法进入poll_cq流程?

由对端应用层发送立即数 ACK 来通知本端 poll cq, 每 1/4 窗口发送一个 signaled + solicited 的 wr, 对端收到 1/4 窗口个的消息后给本端发一个signaled + solicited wr 作为 ACK 通知本端 poll cq。只有当本端 poll 到某个带 IBV_SEND_SIGNALED 标志的 wr 产生的 send CQE 后,驱动才会清空该 wr 之前的 SQE。进一步细化的话, 当 _sq_window_size 小于 _window_size 时说明对端接收消息比本端产生 send CQE 快, 那么后续发消息可以考虑更高频次地设置 signaled + solicited。

legionxiong avatar Nov 18 '25 08:11 legionxiong

提到#3145 (comment) 使用一个CQ即可。那在solicited_only=1时,怎么感知IBV_WC_SEND WC的生成呢?

所以在给一个 wr 设置 IBV_SEND_SIGNALED 标志的同时需要设置 IBV_SEND_SOLICITED 标志, solicited_only 的 wr 不需要感知 IBV_WC_SEND。

@legionxiong hi,本端的IBV_SEND_SOLICITED设置后影响的应当是对端是否在comp channel上生成一个event以触发poll_cq,这个问题看起来是本端一直没有event产生,导致没法进入poll_cq流程?

由对端应用层发送立即数 ACK 来通知本端 poll cq, 每 1/4 窗口发送一个 signaled + solicited 的 wr, 对端收到 1/4 窗口个的消息后给本端发一个signaled + solicited wr 作为 ACK 通知本端 poll cq。当本端 poll 到 IBV_SEND_SIGNALED wr 产生的 CQE 后可以认为驱动层面释放掉了 1/4 窗口的 SQE。进一步细化的话, 当 _sq_window_size 小于 _window_size 时说明对端接收消息比本端产生 send CQE 快, 那么后续发消息可以考虑更高频次地设置 signaled + solicited。

现在brpc的CutFromIOBufList中逻辑跟你讲的类似,(在默认配置下,一次发送最多发送8KB,1/4 窗口就是31*8=248KB,所以不用考虑_unsolicited_bytes > 1048576这个分支)

bool solicited = false;
        if (window == 1 || current + 1 >= ndata) {
            // Only last message in the write queue or last message in the
            // current window will be flagged as solicited.
            solicited = true;
        } else {
            if (_unsolicited > _local_window_capacity / 4) {
                // Make sure the recv side can be signaled to return ack
                solicited = true;
            } else if (_accumulated_ack > _remote_window_capacity / 4) {
                // Make sure the recv side can be signaled to handle ack
                solicited = true;
            } else if (_unsolicited_bytes > 1048576) {
                // Make sure the recv side can be signaled when it receives enough data
                solicited = true;
            } else {
                ++_unsolicited;
                _unsolicited_bytes += this_len;
                _accumulated_ack += imm;
            }
        }
        if (solicited) {
            wr.send_flags |= IBV_SEND_SOLICITED;
            _unsolicited = 0;
            _unsolicited_bytes = 0;
            _accumulated_ack = 0;
        }

另一处是:

int RdmaEndpoint::SendAck(int num) {
    if (_new_rq_wrs.fetch_add(num, butil::memory_order_relaxed) > _remote_window_capacity / 2) {
        return SendImm(_new_rq_wrs.exchange(0, butil::memory_order_relaxed));
    }
    return 0;
}

int RdmaEndpoint::SendImm(uint32_t imm) {
    if (imm == 0) {
        return 0;
    }

    ibv_send_wr wr;
    memset(&wr, 0, sizeof(wr));
    wr.opcode = IBV_WR_SEND_WITH_IMM;
    wr.imm_data = butil::HostToNet32(imm);
    wr.send_flags |= IBV_SEND_SOLICITED;
    wr.send_flags |= IBV_SEND_SIGNALED;

    ibv_send_wr* bad = NULL;
    int err = ibv_post_send(_resource->qp, &wr, &bad);
    if (err != 0) {
        // We use other way to guarantee the Send Queue is not full.
        // So we just consider this error as an unrecoverable error.
        LOG(WARNING) << "Fail to ibv_post_send: " << berror(err);
        return -1;
    }
    return 0;
}

这两处都会设置IBV_SEND_SOLICITED,我理解你的意思就是去修改这两处提高event生成频率吗

sunce4t avatar Nov 18 '25 08:11 sunce4t

由对端应用层发送立即数 ACK 来通知本端 poll cq, 每 1/4 窗口发送一个 signaled + solicited 的 wr, 对端收到 1/4 窗口个的消息后给本端发一个signaled + solicited wr 作为 ACK 通知本端 poll cq。当本端 poll 到 IBV_SEND_SIGNALED wr 产生的 CQE 后可以认为驱动层面释放掉了 1/4 窗口的 SQE。

我理解也有这个问题吧

由于以太网的问题send cqe一直在延迟生成,所以这个过程中一直没有poll到send cqe(按照目前brpc的默认配置,这个过程可能有四次或者两次event触发poll_cq,但在这个过程中send cqe的生成都被延迟了)。 最后当本端的sq应用层窗口消耗完毕,对端无法再回复,本端也无法再进入poll_cq了;最终导致超时。

当前实现得抢到_new_rq_wrs,才会发送立即数ACK。发送立即数ACK和发送消息时ACK是并发竞争的,对端收到 1/4 窗口个的消息后给本端发一个signaled + solicited wr 作为 ACK 通知本端 poll cq,看下来不好实现。

chenBright avatar Nov 18 '25 08:11 chenBright

由对端应用层发送立即数 ACK 来通知本端 poll cq, 每 1/4 窗口发送一个 signaled + solicited 的 wr, 对端收到 1/4 窗口个的消息后给本端发一个signaled + solicited wr 作为 ACK 通知本端 poll cq。当本端 poll 到 IBV_SEND_SIGNALED wr 产生的 CQE 后可以认为驱动层面释放掉了 1/4 窗口的 SQE。

我理解也有这个问题吧

由于以太网的问题send cqe一直在延迟生成,所以这个过程中一直没有poll到send cqe(按照目前brpc的默认配置,这个过程可能有四次或者两次event触发poll_cq,但在这个过程中send cqe的生成都被延迟了)。 最后当本端的sq应用层窗口消耗完毕,对端无法再回复,本端也无法再进入poll_cq了;最终导致超时。

当前实现得抢到_new_rq_wrs,才会发送立即数ACK。发送立即数ACK和发送消息时ACK是并发竞争的,对端收到 1/4 窗口个的消息后给本端发一个signaled + solicited wr 作为 ACK 通知本端 poll cq,看下来不好实现。

3145 pr中拆成两个CQ,我理解的话,拆分后 每1/4个窗口值才产生一个SQ对应CQ的event,这带来的cpu开销感觉可以接受

sunce4t avatar Nov 18 '25 09:11 sunce4t