滑动窗口bug:Fail to ibv_post_send: Cannot allocate memory, window=3, sq_current=98
Describe the bug 此错误代表:滑动窗口表示可发送,但RDMA的SQ(Send Queue)没有可用的空间发送请求;
滑动窗口代表发送方可继续发送的能力;当前,滑动窗口的更新时机为:poll到带有imm(也就是带有acks)的recv cqe,并且进行对应的处理时; RDMA的SQ是一个环形buffer,其上存放发送但未完成的请求,SQ的指针更新时机为 poll 到 send cqe时; 这里我们可以发现:SQ和滑动窗口的更新时机不一致;
按照正常逻辑来讲,如果处理到带有imm的recv cqe,那么代表此前发送方所发出的RDMA Send已经完成,在recv cqe之前应该先生成send cqe;
如下图示意:
这是brpc期望的一个逻辑,在更新滑动窗口值前,先更新了SQ;
但遇到该问题时,说明情况是这样的: 发送方调用大量RDMA Send操作,占用完SQ; 接着发送方poll到带有imm的recv cqe(并且此时一定没有其他send cqe,如果有的话,按照brpc一次poll 32个cqe,也会触发SQ指针更新),进入处理逻辑更新滑动窗口值; 更新窗口值后,某个线程发现窗口值非0,尝试发送,但SQ没有可用空间存放请求了,于是报错;
为什么在带有imm的recv cqe之前没有send cqe呢? 根据brpc的逻辑,带有imm就代表接收方收到数据(意味着RDMA Send已经完成),并向发送方发送确认; 而此时发送方已经生成带有imm的recv cqe,那么为什么已经完成的RDMA Send还没生成send cqe?
查看了IB spec,不同WQ共享CQ他们生成cqe是不能确保顺序的,只能确保同一个WQ内部的cqe生成顺序:
所以,brpc的SQ和RQ共享了一个CQ后,不能确保发送方收到带有imm的recv cqe之前,一定生成send cqe;
可以看如下示意图:
因此我认为此问题由两个原因共同导致: 1.滑动窗口更新时机和RDMA SQ更新时机不一致; 2.cqe生成顺序的不确定性。
To Reproduce 由业务触发,本地测试程序没有复现成功。不过发现日志中报错时业务均在发送大量数据。
Expected behavior 我们期望的是:滑动窗口更新时机与RDMA SQ的更新时机保持一致; 目前我们实现了一个patch,保证滑动窗口更新时机一定在RDMA SQ的更新时机之后。(确保RDMA SQ有空间再更新滑动窗口) 我们新增了_remote_acks记录接收方确认的数目;因为同一个WQ内是保证顺序的,我们使用_sq_to_update队列记录unsignaled的数目,使用_sq_update_flag队列记录已经poll到的send cqe数目; 当处理带有imm的recv cqe时,判断此前是否poll到过send cqe,若没有poll到,仅将此次的acks累加到_remote_acks中,不进行窗口值更新。
分析: 此patch不影响正常逻辑(即先生成send cqe,后生成recv cqe),这种情况在此patch下没有任何区别; 当先生成recv cqe,后生成send cqe时,此patch强制要求窗口值更新在send cqe后,保证SQ不会溢出。
diff --git a/src/brpc/rdma/rdma_endpoint.cpp b/src/brpc/rdma/rdma_endpoint.cpp
index 1d502a98..dac82bb7 100644
--- a/src/brpc/rdma/rdma_endpoint.cpp
+++ b/src/brpc/rdma/rdma_endpoint.cpp
@@ -101,7 +114,7 @@ static const uint32_t ACK_MSG_RDMA_OK = 0x1;
static butil::Mutex* g_rdma_resource_mutex = NULL;
static RdmaResource* g_rdma_resource_list = NULL;
-
+static butil::atomic<uint64_t> g_wr_id(0);
struct HelloMessage {
void Serialize(void* data) const;
void Deserialize(void* data);
@@ -191,7 +204,11 @@ RdmaEndpoint::RdmaEndpoint(Socket* s)
, _remote_window_capacity(0)
, _window_size(0)
, _new_rq_wrs(0)
+ , _remote_acks(0)
+ , _m_sq_unsignaled(0)
{
+ LOG(INFO) << "_remote_acks: " << _remote_acks.load();
+ LOG(INFO) << "_m_sq_unsignaled: " << _m_sq_unsignaled;
if (_sq_size < MIN_QP_SIZE) {
_sq_size = MIN_QP_SIZE;
}
@@ -208,6 +225,11 @@ RdmaEndpoint::RdmaEndpoint(Socket* s)
}
RdmaEndpoint::~RdmaEndpoint() {
+ LOG(INFO) << _window_size << " " << _remote_acks << " " << _sq_update_flag.size() << " " << _sq_to_update.size() << " " << _sq_unsignaled << " " << _sq_unsignaled;
+ while(_sq_to_update.empty() == false) {
+ LOG(INFO) << _sq_to_update.front();
+ _sq_to_update.pop();
+ }
Reset();
bthread::butex_destroy(_read_butex);
}
@@ -231,6 +253,8 @@ void RdmaEndpoint::Reset() {
_new_rq_wrs = 0;
_sq_sent = 0;
_rq_received = 0;
+ _remote_acks.store(0, butil::memory_order_relaxed);
+ _m_sq_unsignaled.store(0, butil::memory_order_relaxed);
}
void RdmaConnect::StartConnect(const Socket* socket,
@@ -878,15 +902,36 @@ ssize_t RdmaEndpoint::CutFromIOBufList(butil::IOBuf** from, size_t ndata) {
}
ibv_send_wr* bad = NULL;
- int err = ibv_post_send(_resource->qp, &wr, &bad);
- if (err != 0) {
- // We use other way to guarantee the Send Queue is not full.
- // So we just consider this error as an unrecoverable error.
- LOG(WARNING) << "Fail to ibv_post_send: " << berror(err)
+
+ wr.wr_id = g_wr_id.fetch_add(1, butil::memory_order_relaxed);
+ {
+ BAIDU_SCOPED_LOCK(_m_sq_mutex);
+ int err = ibv_post_send(_resource->qp, &wr, &bad);
+ if (err != 0) {
+ // We use other way to guarantee the Send Queue is not full.
+ // So we just consider this error as an unrecoverable error.
+ LOG(WARNING) << "Fail to ibv_post_send: " << berror(err)
<< ", window=" << window
<< ", sq_current=" << _sq_current;
errno = err;
return -1;
+ }
+ _m_sq_unsignaled.fetch_add(1, butil::memory_order_release);
+ uint16_t cur_unsignaled = 0;
+ if(wr.send_flags & IBV_SEND_SIGNALED) {
+ cur_unsignaled = _m_sq_unsignaled.exchange(0, butil::memory_order_acquire);
+ }
+ if(cur_unsignaled != 0) {
+ BAIDU_SCOPED_LOCK(_sq_update_mutex);
+ _sq_to_update.push(cur_unsignaled);
+ //LOG(INFO) << "send signaled before: " << cur_unsignaled;
+ }
}
++_sq_current;
@@ -924,13 +969,26 @@ int RdmaEndpoint::SendImm(uint32_t imm) {
wr.send_flags |= IBV_SEND_SIGNALED;
ibv_send_wr* bad = NULL;
- int err = ibv_post_send(_resource->qp, &wr, &bad);
- if (err != 0) {
- // We use other way to guarantee the Send Queue is not full.
- // So we just consider this error as an unrecoverable error.
- LOG(WARNING) << "Fail to ibv_post_send: " << berror(err);
- return -1;
+ wr.wr_id = g_wr_id.fetch_add(1, butil::memory_order_relaxed);
+ uint16_t cur_unsignaled = 0;
+ {
+ BAIDU_SCOPED_LOCK(_m_sq_mutex);
+ int err = ibv_post_send(_resource->qp, &wr, &bad);
+ if (err != 0) {
+ // We use other way to guarantee the Send Queue is not full.
+ // So we just consider this error as an unrecoverable error.
+ LOG(WARNING) << "Fail to ibv_post_send: " << berror(err);
+ return -1;
+ }
+ cur_unsignaled = _m_sq_unsignaled.exchange(0, butil::memory_order_relaxed);
+ {
+ BAIDU_SCOPED_LOCK(_sq_update_mutex);
+ _sq_to_update.push(cur_unsignaled);
+ //LOG(INFO) << "wr id: " << wr.wr_id << " SendImm before: " << cur_unsignaled;
+ }
+
}
+
return 0;
}
@@ -938,8 +996,11 @@ ssize_t RdmaEndpoint::HandleCompletion(ibv_wc& wc) {
bool zerocopy = FLAGS_rdma_recv_zerocopy;
switch (wc.opcode) {
case IBV_WC_SEND: { // send completion
- // Do nothing
- break;
+ {
+ BAIDU_SCOPED_LOCK(_sq_flag_mutex);
+ _sq_update_flag.push(true);
+ }
+ break;
}
case IBV_WC_RECV: { // recv completion
// Please note that only the first wc.byte_len bytes is valid
@@ -959,24 +1020,66 @@ ssize_t RdmaEndpoint::HandleCompletion(ibv_wc& wc) {
}
if (wc.imm_data > 0) {
// Clear sbuf here because we ignore event wakeup for send completions
- uint32_t acks = butil::NetToHost32(wc.imm_data);
- uint32_t num = acks;
- while (num > 0) {
- _sbuf[_sq_sent++].clear();
- if (_sq_sent == _sq_size - RESERVED_WR_NUM) {
- _sq_sent = 0;
- }
- --num;
- }
- butil::subtle::MemoryBarrier();
// Update window
- uint32_t wnd_thresh = _local_window_capacity / 8;
- if (_window_size.fetch_add(acks, butil::memory_order_relaxed) >= wnd_thresh
- || acks >= wnd_thresh) {
+ //uint32_t wnd_thresh = _local_window_capacity / 8;
+ //if (_window_size.fetch_add(acks, butil::memory_order_relaxed) >= wnd_thresh
+ // || acks >= wnd_thresh) {
// Do not wake up writing thread right after _window_size > 0.
// Otherwise the writing thread may switch to background too quickly.
- _socket->WakeAsEpollOut();
+ // _socket->WakeAsEpollOut();
+ //}
+ uint32_t acks = butil::NetToHost32(wc.imm_data);
+ //LOG(INFO) << "acks: " << acks;
+ _remote_acks.fetch_add(acks, butil::memory_order_relaxed);
+ {
+ BAIDU_SCOPED_LOCK(_sq_update_mutex);
+ while(_sq_to_update.empty() == false && _remote_acks.load() >= _sq_to_update.front()) {
+ {
+ BAIDU_SCOPED_LOCK(_sq_flag_mutex);
+ if(_sq_update_flag.empty() == true) {
+ break;
+ }
+ _sq_update_flag.pop();
+ }
+ uint32_t wnd_to_update = _sq_to_update.front();
+ _sq_to_update.pop();
+ _remote_acks.fetch_sub(wnd_to_update, butil::memory_order_relaxed);
+
+ //LOG(INFO) << wnd_to_update << " " << _remote_acks << " " << _sq_update_flag.size() << " " << _sq_to_update.size();
+ uint32_t num = wnd_to_update;
+ while(num > 0) {
+ _sbuf[_sq_sent++].clear();
+ if (_sq_sent == _sq_size - RESERVED_WR_NUM) {
+ _sq_sent = 0;
+ }
+ --num;
+ }
+ butil::subtle::MemoryBarrier();
+ uint32_t wnd_thresh = _local_window_capacity / 8;
+ if (_window_size.fetch_add(wnd_to_update, butil::memory_order_relaxed) >= wnd_thresh
+ || acks >= wnd_thresh) {
+ // Do not wake up writing thread right after _window_size > 0.
+ // Otherwise the writing thread may switch to background too quickly.
+ _socket->WakeAsEpollOut();
+ }
+ }
}
}
diff --git a/src/brpc/rdma/rdma_endpoint.h b/src/brpc/rdma/rdma_endpoint.h
index de7cd5f6..114eb682 100644
--- a/src/brpc/rdma/rdma_endpoint.h
+++ b/src/brpc/rdma/rdma_endpoint.h
@@ -31,7 +31,7 @@
#include "butil/containers/mpsc_queue.h"
#include "brpc/socket.h"
-
+#include <queue>
namespace brpc {
class Socket;
namespace rdma {
@@ -262,6 +262,13 @@ private:
// The number of new WRs posted in the local Recv Queue
butil::atomic<uint16_t> _new_rq_wrs;
+ butil::atomic<int> _remote_acks;
+ butil::Mutex _m_sq_mutex;
+ butil::Mutex _sq_update_mutex;
+ butil::Mutex _sq_flag_mutex;
+ butil::atomic<uint16_t> _m_sq_unsignaled;
+ std::queue<uint16_t> _sq_to_update;
+ std::queue<bool> _sq_update_flag;
// butex for inform read events on TCP fd during handshake
butil::atomic<int> *_read_butex;
** Versions OS: Compiler: brpc: protobuf:
Additional context/screenshots
把window拆成send window和recv window不就好了吗, 每次发送前比较这两者的最小值是否大于0, send wc则根据unsignaled的比例来更新send window, 你这样不是还加了几个锁和队列, 拆window只需要多做一次relaxed order的原子操作即可
把window拆成send window和recv window不就好了吗, 每次发送前比较这两者的最小值是否大于0, send wc则根据unsignaled的比例来更新send window, 你这样不是还加了几个锁和队列, 拆window只需要多做一次relaxed order的原子操作即可
hi,加锁和队列是因为 SendImm也会signaled,SendImm的触发时机不能保证,例如:可能当前有10个unsignaled,刚好触发一次SendImm 所以我加了锁和队列记录每一次ibv_post_send前有多少个unsignaled请求
最开始我们有尝试在处理send cqe的逻辑处按比例更新你提到的send window,后面发现不能确定每个send cqe前有多少个unsignaled的请求。
拆成两个window确实更好,如果要精准更新send window,就得保证ibv_post_send的顺序跟记录unsignaled的顺序一致,这一部还得加锁吧。 不过这样只需要加一个锁、一个队列和一个原子变量。
我明白了, 那其实最简单的一个方案是SendImm改为unsignaled不可以嘛, 因为我理解SendImm成功状态下的wc本身没有什么用, 而失败的wc本身即使是unsignaled的状态也是会产生的, 失败情况下可以视为整个qp状态已经不对了
我明白了, 那其实最简单的一个方案是SendImm改为unsignaled不可以嘛, 因为我理解SendImm成功状态下的wc本身没有什么用, 而失败的wc本身即使是unsignaled的状态也是会产生的, 失败情况下可以视为整个qp状态已经不对了
这个可以的,如果按照你说的改成unsignaled,只需要一个记录接收窗口的原子变量就行了。
是的, 这样可以大大简化实现
是的, 这样可以大大简化实现
目前我们改了一下,没有去掉SendImm的IBV_SEND_SIGNALED,我下面这个版本仍旧用锁和队列保证send cqe和记录的unsignaled num的顺序一致; 如果去掉,就只需要一个_remote_recv_window了;
diff --git a/src/brpc/rdma/rdma_endpoint.cpp b/src/brpc/rdma/rdma_endpoint.cpp
index f3bc82d7..159bce90 100644
--- a/src/brpc/rdma/rdma_endpoint.cpp
+++ b/src/brpc/rdma/rdma_endpoint.cpp
@@ -101,6 +101,7 @@ static const uint32_t ACK_MSG_RDMA_OK = 0x1;
static butil::Mutex* g_rdma_resource_mutex = NULL;
static RdmaResource* g_rdma_resource_list = NULL;
+static butil::atomic<uint64_t> g_wr_id(0);
struct HelloMessage {
void Serialize(void* data) const;
@@ -514,6 +533,7 @@ void* RdmaEndpoint::ProcessHandshakeAtClient(void* arg) {
ep->_remote_window_capacity =
std::min(ep->_rq_size, remote_msg.sq_size) - RESERVED_WR_NUM,
ep->_window_size.store(ep->_local_window_capacity, butil::memory_order_relaxed);
+ ep->_remote_recv_window.store(ep->_remote_window_capacity, butil::memory_order_relaxed);
ep->_state = C_BRINGUP_QP;
if (ep->BringUpQp(remote_msg.lid, remote_msg.gid, remote_msg.qp_num) < 0) {
@@ -622,7 +642,7 @@ void* RdmaEndpoint::ProcessHandshakeAtServer(void* arg) {
ep->_remote_window_capacity =
std::min(ep->_rq_size, remote_msg.sq_size) - RESERVED_WR_NUM,
ep->_window_size.store(ep->_local_window_capacity, butil::memory_order_relaxed);
-
+ ep->_remote_recv_window.store(ep->_remote_window_capacity, butil::memory_order_relaxed);
ep->_state = S_ALLOC_QPCQ;
if (ep->AllocateResources() < 0) {
LOG(WARNING) << "Fail to allocate rdma resources, fallback to tcp:"
@@ -872,30 +873,37 @@ ssize_t RdmaEndpoint::CutFromIOBufList(butil::IOBuf** from, size_t ndata) {
// Avoid too much send completion event to reduce the CPU overhead
++_sq_unsignaled;
if (_sq_unsignaled >= _local_window_capacity / 4) {
// Refer to:
// http::www.rdmamojo.com/2014/06/30/working-unsignaled-completions/
wr.send_flags |= IBV_SEND_SIGNALED;
_sq_unsignaled = 0;
}
ibv_send_wr* bad = NULL;
- int err = ibv_post_send(_resource->qp, &wr, &bad);
- if (err != 0) {
- // We use other way to guarantee the Send Queue is not full.
- // So we just consider this error as an unrecoverable error.
- LOG(WARNING) << "Fail to ibv_post_send: " << berror(err)
+ wr.wr_id = g_wr_id.fetch_add(1, butil::memory_order_relaxed);
+ {
+ BAIDU_SCOPED_LOCK(_m_sq_mutex);
+ int err = ibv_post_send(_resource->qp, &wr, &bad);
+ if (err != 0) {
+ // We use other way to guarantee the Send Queue is not full.
+ // So we just consider this error as an unrecoverable error.
+ LOG(WARNING) << "Fail to ibv_post_send: " << berror(err)
<< ", window=" << window
<< ", sq_current=" << _sq_current;
- errno = err;
- return -1;
- }
+ errno = err;
+ return -1;
+ }
+ _m_sq_unsignaled.fetch_add(1, butil::memory_order_release);
+ uint16_t cur_unsignaled = 0;
+ if(wr.send_flags & IBV_SEND_SIGNALED) {
+ cur_unsignaled = _m_sq_unsignaled.exchange(0, butil::memory_order_acquire);
+ }
+ if(cur_unsignaled != 0) {
+ BAIDU_SCOPED_LOCK(_sq_update_mutex);
+ _sq_to_update.push(cur_unsignaled);
+ }
+ }
++_sq_current;
if (_sq_current == _sq_size - RESERVED_WR_NUM) {
@@ -932,19 +940,27 @@ int RdmaEndpoint::SendImm(uint32_t imm) {
wr.send_flags |= IBV_SEND_SIGNALED;
ibv_send_wr* bad = NULL;
- int err = ibv_post_send(_resource->qp, &wr, &bad);
- if (err != 0) {
- // We use other way to guarantee the Send Queue is not full.
- // So we just consider this error as an unrecoverable error.
- LOG(WARNING) << "Fail to ibv_post_send: " << berror(err);
- return -1;
+ wr.wr_id = g_wr_id.fetch_add(1, butil::memory_order_relaxed);
+ uint16_t cur_unsignaled = 0;
+ {
+ BAIDU_SCOPED_LOCK(_m_sq_mutex);
+ int err = ibv_post_send(_resource->qp, &wr, &bad);
+ if (err != 0) {
+ // We use other way to guarantee the Send Queue is not full.
+ // So we just consider this error as an unrecoverable error.
+ LOG(WARNING) << "Fail to ibv_post_send: " << berror(err);
+ return -1;
+ }
+ cur_unsignaled = _m_sq_unsignaled.exchange(0, butil::memory_order_relaxed);
}
+
+
+
+ {
+ BAIDU_SCOPED_LOCK(_sq_update_mutex);
+ _sq_to_update.push(cur_unsignaled);
+ //LOG(INFO) << "SendImm before: " << cur_unsignaled;
}
return 0;
}
@@ -938,7 +987,32 @@ ssize_t RdmaEndpoint::HandleCompletion(ibv_wc& wc) {
bool zerocopy = FLAGS_rdma_recv_zerocopy;
switch (wc.opcode) {
case IBV_WC_SEND: { // send completion
- // Do nothing
+ uint16_t wnd_to_update = 0;
+ {
+ BAIDU_SCOPED_LOCK(_sq_update_mutex);
+ wnd_to_update = _sq_to_update.front();
+ _sq_to_update.pop();
+ }
+ if (wnd_to_update != 0) {
+ uint32_t num = wnd_to_update;
+ while(num > 0) {
+ _sbuf[_sq_sent++].clear();
+ if (_sq_sent == _sq_size - RESERVED_WR_NUM) {
+ _sq_sent = 0;
+ }
+ --num;
+ }
+ butil::subtle::MemoryBarrier();
+ uint32_t wnd_thresh = _local_window_capacity / 8;
+ if ((_window_size.fetch_add(wnd_to_update, butil::memory_order_relaxed) >= wnd_thresh)) {
+ // Do not wake up writing thread right after _window_size > 0.
+ // Otherwise the writing thread may switch to background too quickly.
+ _socket->WakeAsEpollOut();
+ }
+
+ }
+ //LOG(INFO) << _window_size << " " << _remote_recv_window;
+ // Do nothing
break;
}
case IBV_WC_RECV: { // recv completion
@@ -958,26 +1032,9 @@ ssize_t RdmaEndpoint::HandleCompletion(ibv_wc& wc) {
}
}
if (wc.imm_data > 0) {
- // Clear sbuf here because we ignore event wakeup for send completions
- uint32_t acks = butil::NetToHost32(wc.imm_data);
- uint32_t num = acks;
- while (num > 0) {
- _sbuf[_sq_sent++].clear();
- if (_sq_sent == _sq_size - RESERVED_WR_NUM) {
- _sq_sent = 0;
- }
- --num;
- }
- butil::subtle::MemoryBarrier();
-
- // Update window
- uint32_t wnd_thresh = _local_window_capacity / 8;
- if (_window_size.fetch_add(acks, butil::memory_order_relaxed) >= wnd_thresh
- || acks >= wnd_thresh) {
- // Do not wake up writing thread right after _window_size > 0.
- // Otherwise the writing thread may switch to background too quickly.
- _socket->WakeAsEpollOut();
- }
+ uint32_t acks = butil::NetToHost32(wc.imm_data);
+ _remote_recv_window.fetch_add(acks, butil::memory_order_relaxed);
+
}
// We must re-post recv WR
if (PostRecv(1, zerocopy) < 0) {
@@ -1500,7 +1516,7 @@ void RdmaEndpoint::PollCq(Socket* m) {
if (wc[i].status != IBV_WC_SUCCESS) {
PLOG(WARNING) << "Fail to handle RDMA completion, error status("
- << wc[i].status << "): " << s->description();
+ << wc[i].status << " " << wc[i].opcode << " " << wc[i].wr_id << "): " << s->description();
s->SetFailed(ERDMA, "RDMA completion error(%d) from %s: %s",
wc[i].status, s->description().c_str(), berror(ERDMA));
continue;
diff --git a/src/brpc/rdma/rdma_endpoint.h b/src/brpc/rdma/rdma_endpoint.h
index de7cd5f6..d4920180 100644
--- a/src/brpc/rdma/rdma_endpoint.h
+++ b/src/brpc/rdma/rdma_endpoint.h
@@ -31,7 +31,7 @@
#include "butil/containers/mpsc_queue.h"
#include "brpc/socket.h"
-
+#include <queue>
namespace brpc {
class Socket;
namespace rdma {
@@ -262,6 +269,11 @@ private:
// The number of new WRs posted in the local Recv Queue
butil::atomic<uint16_t> _new_rq_wrs;
+ butil::atomic<int> _remote_recv_window;
+ butil::Mutex _m_sq_mutex;
+ butil::Mutex _sq_update_mutex;
+ butil::atomic<uint16_t> _m_sq_unsignaled;
+ std::queue<uint16_t> _sq_to_update;
// butex for inform read events on TCP fd during handshake
butil::atomic<int> *_read_butex;
是的, 这样可以大大简化实现
@Yangfisher1 @yanglimingcn 大佬们看看这个版本, _socket->WakeAsEpollOut(); 这一句不确定该放在哪里,暂时先放在case IBV_WR_SEND中
diff --git a/src/brpc/rdma/rdma_endpoint.cpp b/src/brpc/rdma/rdma_endpoint.cpp
index 1d502a98..99ef1159 100644
--- a/src/brpc/rdma/rdma_endpoint.cpp
+++ b/src/brpc/rdma/rdma_endpoint.cpp
@@ -191,6 +191,7 @@ RdmaEndpoint::RdmaEndpoint(Socket* s)
, _remote_window_capacity(0)
, _window_size(0)
, _new_rq_wrs(0)
+ , _remote_recv_window(0)
{
if (_sq_size < MIN_QP_SIZE) {
_sq_size = MIN_QP_SIZE;
@@ -208,6 +209,7 @@ RdmaEndpoint::RdmaEndpoint(Socket* s)
}
RdmaEndpoint::~RdmaEndpoint() {
+ LOG(INFO) << _window_size << " " << _remote_recv_window << " " << _sq_unsignaled;
Reset();
bthread::butex_destroy(_read_butex);
}
@@ -231,6 +233,7 @@ void RdmaEndpoint::Reset() {
_new_rq_wrs = 0;
_sq_sent = 0;
_rq_received = 0;
+ _remote_recv_window.store(0, butil::memory_order_relaxed);
}
void RdmaConnect::StartConnect(const Socket* socket,
@@ -514,7 +517,7 @@ void* RdmaEndpoint::ProcessHandshakeAtClient(void* arg) {
ep->_remote_window_capacity =
std::min(ep->_rq_size, remote_msg.sq_size) - RESERVED_WR_NUM,
ep->_window_size.store(ep->_local_window_capacity, butil::memory_order_relaxed);
-
+ ep->_remote_recv_window.store(ep->_remote_window_capacity, butil::memory_order_relaxed);
ep->_state = C_BRINGUP_QP;
if (ep->BringUpQp(remote_msg.lid, remote_msg.gid, remote_msg.qp_num) < 0) {
LOG(WARNING) << "Fail to bringup QP, fallback to tcp:" << s->description();
@@ -622,7 +625,7 @@ void* RdmaEndpoint::ProcessHandshakeAtServer(void* arg) {
ep->_remote_window_capacity =
std::min(ep->_rq_size, remote_msg.sq_size) - RESERVED_WR_NUM,
ep->_window_size.store(ep->_local_window_capacity, butil::memory_order_relaxed);
-
+ ep->_remote_recv_window.store(ep->_remote_window_capacity, butil::memory_order_relaxed);
ep->_state = S_ALLOC_QPCQ;
if (ep->AllocateResources() < 0) {
LOG(WARNING) << "Fail to allocate rdma resources, fallback to tcp:"
@@ -787,12 +790,14 @@ ssize_t RdmaEndpoint::CutFromIOBufList(butil::IOBuf** from, size_t ndata) {
size_t total_len = 0;
size_t current = 0;
uint32_t window = 0;
+ uint32_t recv_window = 0;
ibv_send_wr wr;
int max_sge = GetRdmaMaxSge();
ibv_sge sglist[max_sge];
while (current < ndata) {
- window = _window_size.load(butil::memory_order_relaxed);
- if (window == 0) {
+ window = _window_size.load(butil::memory_order_acquire);
+ recv_window = _remote_recv_window.load(butil::memory_order_acquire);
+ if (window == 0 || recv_window == 0) {
if (total_len > 0) {
break;
} else {
@@ -898,7 +903,8 @@ ssize_t RdmaEndpoint::CutFromIOBufList(butil::IOBuf** from, size_t ndata) {
// Because there is at most one thread can enter this function for each
// Socket, and the other thread of HandleCompletion can only add this
// counter.
- _window_size.fetch_sub(1, butil::memory_order_relaxed);
+ _window_size.fetch_sub(1, butil::memory_order_release);
+ _remote_recv_window.fetch_sub(1, butil::memory_order_release);
}
return total_len;
@@ -921,7 +927,7 @@ int RdmaEndpoint::SendImm(uint32_t imm) {
wr.opcode = IBV_WR_SEND_WITH_IMM;
wr.imm_data = butil::HostToNet32(imm);
wr.send_flags |= IBV_SEND_SOLICITED;
- wr.send_flags |= IBV_SEND_SIGNALED;
+ //wr.send_flags |= IBV_SEND_SIGNALED;
ibv_send_wr* bad = NULL;
int err = ibv_post_send(_resource->qp, &wr, &bad);
@@ -938,7 +944,23 @@ ssize_t RdmaEndpoint::HandleCompletion(ibv_wc& wc) {
bool zerocopy = FLAGS_rdma_recv_zerocopy;
switch (wc.opcode) {
case IBV_WC_SEND: { // send completion
- // Do nothing
+ uint16_t wnd_to_update = _local_window_capacity / 4;
+ uint32_t num = wnd_to_update;
+ while(num > 0) {
+ _sbuf[_sq_sent++].clear();
+ if (_sq_sent == _sq_size - RESERVED_WR_NUM) {
+ _sq_sent = 0;
+ }
+ --num;
+ }
+ butil::subtle::MemoryBarrier();
+ uint32_t wnd_thresh = _local_window_capacity / 8;
+ _window_size.fetch_add(wnd_to_update, butil::memory_order_release);
+ //if ((_remote_recv_window.load(butil::memory_order_relaxed) >= wnd_thresh)) {
+ // Do not wake up writing thread right after _window_size > 0.
+ // Otherwise the writing thread may switch to background too quickly.
+ _socket->WakeAsEpollOut();
+ //}
break;
}
case IBV_WC_RECV: { // recv completion
@@ -958,27 +980,9 @@ ssize_t RdmaEndpoint::HandleCompletion(ibv_wc& wc) {
}
}
if (wc.imm_data > 0) {
- // Clear sbuf here because we ignore event wakeup for send completions
uint32_t acks = butil::NetToHost32(wc.imm_data);
- uint32_t num = acks;
- while (num > 0) {
- _sbuf[_sq_sent++].clear();
- if (_sq_sent == _sq_size - RESERVED_WR_NUM) {
- _sq_sent = 0;
- }
- --num;
- }
- butil::subtle::MemoryBarrier();
-
- // Update window
- uint32_t wnd_thresh = _local_window_capacity / 8;
- if (_window_size.fetch_add(acks, butil::memory_order_relaxed) >= wnd_thresh
- || acks >= wnd_thresh) {
- // Do not wake up writing thread right after _window_size > 0.
- // Otherwise the writing thread may switch to background too quickly.
- _socket->WakeAsEpollOut();
- }
- }
+ _remote_recv_window.fetch_add(acks, butil::memory_order_release);
+ }
// We must re-post recv WR
if (PostRecv(1, zerocopy) < 0) {
return -1;
diff --git a/src/brpc/rdma/rdma_endpoint.h b/src/brpc/rdma/rdma_endpoint.h
index de7cd5f6..8cbaf710 100644
--- a/src/brpc/rdma/rdma_endpoint.h
+++ b/src/brpc/rdma/rdma_endpoint.h
@@ -262,6 +262,7 @@ private:
// The number of new WRs posted in the local Recv Queue
butil::atomic<uint16_t> _new_rq_wrs;
+ butil::atomic<uint16_t> _remote_recv_window;
// butex for inform read events on TCP fd during handshake
butil::atomic<int> *_read_butex;
@@ -752,7 +755,7 @@ bool RdmaEndpoint::IsWritable() const {
return false;
}
- return _window_size.load(butil::memory_order_relaxed) > 0;
+ return _window_size.load(butil::memory_order_relaxed) > 0 && _remote_recv_window.load(butil::memory_order_relaxed) > 0;
}
_socket->WakeAsEpollOut()我理解应该两个window更新的时候都要放, 当两个window的最小值超过window threshold的时候调用WakeAsEpollOut来通知上层可以继续写了
问题总结,因为在发送端产生的recv completion event和send completion event的时机不一致,导致接收端产生回复ACK的时候,send completion event可能还没有产生,所以申请sqe的时候就可能失败。 可以让recv completion event和send completion event的产生时机是一致的,这样能保证接收端回复ACK的时候,发送端一定有send completion event产生。具体修改:
_socket->WakeAsEpollOut()我理解应该两个window更新的时候都要放, 当两个window的最小值超过window threshold的时候调用WakeAsEpollOut来通知上层可以继续写了
是的,后面我们也加了一下,还在测试,另外前面那版原本少改了这个:
@@ -752,7 +755,7 @@ bool RdmaEndpoint::IsWritable() const {
return false;
}
- return _window_size.load(butil::memory_order_relaxed) > 0;
+ return _window_size.load(butil::memory_order_relaxed) > 0 && _remote_recv_window.load(butil::memory_order_relaxed) > 0;
}
可以让recv completion event和send completion event的产生时机是一致的,这样能保证接收端回复ACK的时候,发送端一定有send completion event产生
@yanglimingcn 这不对吧, 这个问题在于client通过recv收到server发来的ack(无论是piggyback的亦或是SendImm专门发的)时候, client的send queue还没有signaled wc更新(这个wc还在后面), 这里修改成solicited也带signaled, 只是server发ack的时候server这一侧会产生一个signaled send wc来更新server侧的send queue, 跟client没关系
这个是client端产生的signaled send wc
我明白你的意思了, 这样也是ok的
问题总结,因为在发送端产生的recv completion event和send completion event的时机不一致,导致接收端产生回复ACK的时候,send completion event可能还没有产生,所以申请sqe的时候就可能失败。 可以让recv completion event和send completion event的产生时机是一致的,这样能保证接收端回复ACK的时候,发送端一定有send completion event产生。
@yanglimingcn solicited && signaled send wc完成之前的send wc被server poll到之后ack,client收到的recv cqe,排在signaled send cqe前面,还是有问题吧?server使用polling模式时,这种情况很容易出现吧。
我提了一个PR,你有时间看看 @chenBright
https://github.com/apache/brpc/pull/3145 我们之前将SendImm的IBV_SEND_SIGNALED注释掉后发现也会有问题,我们也采取了类似这个pr的方法,用特定的wr_id区分imm的send cqe,并且没有任何问题; 因此后续这个bug的修复请参考这个pr。
滑动窗口失效的原因应该不是共享 CQ 所致, 而是滑动窗口机制依赖的一个非常关键的前置条件并不成立, 就是本端应用层收到了对端应用层的 ACK 就认为底层 SQ 都释放掉了。 在无损的 IB 网络, 这个条件可能成立。 而对于基于以太网的 RoCEv2, 这个是不保证的。 底层驱动发送完一个消息后,只有收到对端驱动的 ACK 才会释放 SQ。 当网络压力大的时候, 存在一种情况, 对端收到了消息然后对端应用层也处理了该消息, 但是对端驱动层回复的 ACK 可能丢了或者超时了; 而本端应用层收到了对端应用层发送的 ACK, 累加窗口, 实际上本端驱动层因为接收 ACK 超时导致重发消息, 并未释放 SQ。
滑动窗口失效的原因应该不是共享 CQ 所致, 而是滑动窗口机制依赖一个前置条件并不成立, 就是本端应用层收到了对端应用层的 ACK 就认为底层 SQ 都释放掉了。 在无损的 IB 网络, 这个条件可能成立。 而对于基于以太网的 RoCEv2, 这个是不保证的。 底层驱动发送一个消息后,只有收到对端驱动的 ACK 才会释放 SQ。 当网络压力大的时候, 存在一种情况, 对端收到了消息然后对端应用层也处理了消息, 但是对端驱动层回复的 ACK 丢了或者超时了; 而本端应用层收到了对端应用层发送的 ACK, 累加窗口, 实际上本端驱动层因为接收 ACK 超时导致重发消息, 并未释放 SQ。
@legionxiong hi,这么理解对吗:就是驱动层ACK丢失了进行重传,导致应用先看见recv cqe去累加窗口,后续再看见send cqe。我理解的话,驱动层ACK是影响cqe的生成,不能直接影响资源的释放。(例如:SQ的释放取决于send cqe从CQ中何时poll下来)
滑动窗口失效的原因应该不是共享 CQ 所致, 而是滑动窗口机制依赖一个前置条件并不成立, 就是本端应用层收到了对端应用层的 ACK 就认为底层 SQ 都释放掉了。 在无损的 IB 网络, 这个条件可能成立。 而对于基于以太网的 RoCEv2, 这个是不保证的。 底层驱动发送一个消息后,只有收到对端驱动的 ACK 才会释放 SQ。 当网络压力大的时候, 存在一种情况, 对端收到了消息然后对端应用层也处理了消息, 但是对端驱动层回复的 ACK 丢了或者超时了; 而本端应用层收到了对端应用层发送的 ACK, 累加窗口, 实际上本端驱动层因为接收 ACK 超时导致重发消息, 并未释放 SQ。
@legionxiong hi,这么理解对吗:就是驱动层ACK丢失了进行重传,导致应用先看见recv cqe去累加窗口,后续再看见send cqe。我理解的话,驱动层ACK是影响cqe的生成,不能直接影响资源的释放。(例如:SQ的释放取决于send cqe从CQ中何时poll下来)
驱动层 ACK 影响的是 WC 的生成, 而 brpc 内有个优化就是 1/4 窗口给 wr 设置一次 IBV_SEND_SIGNALED。 只有设置了 IBV_SEND_SIGNALED 标志的 wr 或者出错了才会给该 wr 生成 WC,只有当应用层 poll 到了 opcode 为 IBV_WC_SEND 的 wr 才可以认为该 wr 及其前面 unsignaled 的 SQ 都被释放掉了。
滑动窗口失效的原因应该不是共享 CQ 所致, 而是滑动窗口机制依赖一个前置条件并不成立, 就是本端应用层收到了对端应用层的 ACK 就认为底层 SQ 都释放掉了。 在无损的 IB 网络, 这个条件可能成立。 而对于基于以太网的 RoCEv2, 这个是不保证的。 底层驱动发送一个消息后,只有收到对端驱动的 ACK 才会释放 SQ。 当网络压力大的时候, 存在一种情况, 对端收到了消息然后对端应用层也处理了消息, 但是对端驱动层回复的 ACK 丢了或者超时了; 而本端应用层收到了对端应用层发送的 ACK, 累加窗口, 实际上本端驱动层因为接收 ACK 超时导致重发消息, 并未释放 SQ。
@legionxiong hi,这么理解对吗:就是驱动层ACK丢失了进行重传,导致应用先看见recv cqe去累加窗口,后续再看见send cqe。我理解的话,驱动层ACK是影响cqe的生成,不能直接影响资源的释放。(例如:SQ的释放取决于send cqe从CQ中何时poll下来)
驱动层 ACK 影响的是 WC 的生成, 而 brpc 内有个优化就是 1/4 窗口给 wr 设置一次 IBV_SEND_SIGNALED。 只有设置了 IBV_SEND_SIGNALED 标志的 wr 或者出错了才会给该 wr 生成 WC,当应用层 poll 到了 opcode 为 IBV_WC_SEND 的 wr 则可以认为该 wr 及其前面 unsignaled 的 SQ 都被释放掉了。
了解,我说的cqe(completion queue element)也就是你指的这个WC(work completion)。那就没问题了。
滑动窗口失效的原因应该不是共享 CQ 所致, 而是滑动窗口机制依赖一个前置条件并不成立, 就是本端应用层收到了对端应用层的 ACK 就认为底层 SQ 都释放掉了。 在无损的 IB 网络, 这个条件可能成立。 而对于基于以太网的 RoCEv2, 这个是不保证的。 底层驱动发送一个消息后,只有收到对端驱动的 ACK 才会释放 SQ。 当网络压力大的时候, 存在一种情况, 对端收到了消息然后对端应用层也处理了消息, 但是对端驱动层回复的 ACK 丢了或者超时了; 而本端应用层收到了对端应用层发送的 ACK, 累加窗口, 实际上本端驱动层因为接收 ACK 超时导致重发消息, 并未释放 SQ。
@legionxiong hi,这么理解对吗:就是驱动层ACK丢失了进行重传,导致应用先看见recv cqe去累加窗口,后续再看见send cqe。我理解的话,驱动层ACK是影响cqe的生成,不能直接影响资源的释放。(例如:SQ的释放取决于send cqe从CQ中何时poll下来)
驱动层 ACK 影响的是 WC 的生成, 而 brpc 内有个优化就是 1/4 窗口给 wr 设置一次 IBV_SEND_SIGNALED。 只有设置了 IBV_SEND_SIGNALED 标志的 wr 或者出错了才会给该 wr 生成 WC,当应用层 poll 到了 opcode 为 IBV_WC_SEND 的 wr 则可以认为该 wr 及其前面 unsignaled 的 SQ 都被释放掉了。
了解,我说的cqe(completion queue element)也就是你指的这个WC(work completion)。那就没问题了。
没错,准确地说是生成 CQE 。
[E1008]Reached timeout=60000ms @Socket{id=13 fd=1160 addr=xxx:xx} (0x0x7f957c964ec0) rdma info={rdma_state=ON, handshake_state=ESTABLISHED, rdma_remote_rq_window_size=63, rdma_sq_window_size=0, rdma_local_window_capacity=125, rdma_remote_window_capacity=125, rdma_sbuf_head=57, rdma_sbuf_tail=120, rdma_rbuf_head=36, rdma_unacked_rq_wr=0, rdma_received_ack=0, rdma_unsolicited_sent=0, rdma_unsignaled_sq_wr=1, rdma_new_rq_wrs=0, }
我们使用rdma_sq_window_size来表示本地SQ大小,遇到这样的case:rdma_sq_window_size是0,似乎是一直没poll到IBV_WC_SEND WC。超时后重试多次也还是超时,此时数据已经发不出去了。
原因应该是:solicited_only=1时,IBV_WC_SEND WC不会生成CQE。如果没有IBV_WC_RECV CEQ触发comp channel事件,就没法poll到IBV_WC_SEND WC,更新rdma_sq_window_size。
我们验证了以下两种方案可以解决这个问题:
- solicited_only=0
- 将CQ拆成send CQ(solicited_only=0)和recv CQ(solicited_only=1)。
@legionxiong 提到https://github.com/apache/brpc/pull/3145#issuecomment-3544927795 使用一个CQ即可。那在solicited_only=1时,怎么感知IBV_WC_SEND WC的生成呢?
提到#3145 (comment) 使用一个CQ即可。那在solicited_only=1时,怎么感知IBV_WC_SEND WC的生成呢?
所以在给一个 wr 设置 IBV_SEND_SIGNALED 标志的同时需要设置 IBV_SEND_SOLICITED 标志, solicited_only 的 wr 不需要感知 IBV_WC_SEND。
提到#3145 (comment) 使用一个CQ即可。那在solicited_only=1时,怎么感知IBV_WC_SEND WC的生成呢?
所以在给一个 wr 设置 IBV_SEND_SIGNALED 标志的同时需要设置 IBV_SEND_SOLICITED 标志, solicited_only 的 wr 不需要感知 IBV_WC_SEND。
@legionxiong hi,本端的IBV_SEND_SOLICITED设置后影响的应当是对端是否在comp channel上生成一个event以触发poll_cq,这个问题看起来是本端一直没有event产生,导致没法进入poll_cq流程?
本端的IBV_SEND_SOLICITED设置后影响的应当是对端是否在comp channel上生成一个event以触发poll_cq,这个问题看起来是本端一直没有event产生,导致没法进入poll_cq流程?
是的,问题是本端没有send CQE产生。
Solicited Event文档提到,IBV_SEND_SOLICITED是在对端的comp channel的生成event。我测过,本端是没有event的,所以不会PollCq。
我理解,当前实现是在对端设置了IBV_SEND_SOLICITED后,触发本端进入PollCq,将这之前的的IBV_WC_SEND顺便poll出来。
本端的IBV_SEND_SOLICITED设置后影响的应当是对端是否在comp channel上生成一个event以触发poll_cq,这个问题看起来是本端一直没有event产生,导致没法进入poll_cq流程?
是的,问题是本端没有send CQE产生。
Solicited Event文档提到,IBV_SEND_SOLICITED是在对端的comp channel的生成event。我测过,本端是没有event的,所以不会PollCq。
当前实现是在对端设置了IBV_SEND_SOLICITED后,触发本端进入PollCq,将这之前的的IBV_WC_SEND顺便poll出来的。
我个人是这样理解的: 假设本端发送请求,对端回复。正常的逻辑: 1.对端在回复的时候是设置IBV_SEND_SOLICITED,本端产生event去poll_cq。 2.poll到send cqe和recv cqe,进行对应窗口更新。 按照 @legionxiong 的说法,send cqe可能由于网络问题延迟生成。 那么在 2 这一步,就会出现应用层先看见recv cqe。这也是之前我们遇到的滑动窗口bug。
关于@chenBright 遇到的问题,是否存在这样的情况: 在本端的sq应用层窗口消耗完前,本端高速发送数据,对端正常的设置IBV_SEND_SOLICITED进行回复,本端也有产生event去触发poll_cq;
由于以太网的问题send cqe一直在延迟生成,所以这个过程中一直没有poll到send cqe(按照目前brpc的默认配置,这个过程可能有四次或者两次event触发poll_cq,但在这个过程中send cqe的生成都被延迟了)。
最后当本端的sq应用层窗口消耗完毕,对端无法再回复,本端也无法再进入poll_cq了;最终导致超时。
提到#3145 (comment) 使用一个CQ即可。那在solicited_only=1时,怎么感知IBV_WC_SEND WC的生成呢?
所以在给一个 wr 设置 IBV_SEND_SIGNALED 标志的同时需要设置 IBV_SEND_SOLICITED 标志, solicited_only 的 wr 不需要感知 IBV_WC_SEND。
@legionxiong hi,本端的IBV_SEND_SOLICITED设置后影响的应当是对端是否在comp channel上生成一个event以触发poll_cq,这个问题看起来是本端一直没有event产生,导致没法进入poll_cq流程?
由对端应用层发送立即数 ACK 来通知本端 poll cq, 每 1/4 窗口发送一个 signaled + solicited 的 wr, 对端收到 1/4 窗口个的消息后给本端发一个signaled + solicited wr 作为 ACK 通知本端 poll cq。只有当本端 poll 到某个带 IBV_SEND_SIGNALED 标志的 wr 产生的 send CQE 后,驱动才会清空该 wr 之前的 SQE。进一步细化的话, 当 _sq_window_size 小于 _window_size 时说明对端接收消息比本端产生 send CQE 快, 那么后续发消息可以考虑更高频次地设置 signaled + solicited。
提到#3145 (comment) 使用一个CQ即可。那在solicited_only=1时,怎么感知IBV_WC_SEND WC的生成呢?
所以在给一个 wr 设置 IBV_SEND_SIGNALED 标志的同时需要设置 IBV_SEND_SOLICITED 标志, solicited_only 的 wr 不需要感知 IBV_WC_SEND。
@legionxiong hi,本端的IBV_SEND_SOLICITED设置后影响的应当是对端是否在comp channel上生成一个event以触发poll_cq,这个问题看起来是本端一直没有event产生,导致没法进入poll_cq流程?
由对端应用层发送立即数 ACK 来通知本端 poll cq, 每 1/4 窗口发送一个 signaled + solicited 的 wr, 对端收到 1/4 窗口个的消息后给本端发一个signaled + solicited wr 作为 ACK 通知本端 poll cq。当本端 poll 到 IBV_SEND_SIGNALED wr 产生的 CQE 后可以认为驱动层面释放掉了 1/4 窗口的 SQE。进一步细化的话, 当
_sq_window_size小于_window_size时说明对端接收消息比本端产生 send CQE 快, 那么后续发消息可以考虑更高频次地设置 signaled + solicited。
现在brpc的CutFromIOBufList中逻辑跟你讲的类似,(在默认配置下,一次发送最多发送8KB,1/4 窗口就是31*8=248KB,所以不用考虑_unsolicited_bytes > 1048576这个分支)
bool solicited = false;
if (window == 1 || current + 1 >= ndata) {
// Only last message in the write queue or last message in the
// current window will be flagged as solicited.
solicited = true;
} else {
if (_unsolicited > _local_window_capacity / 4) {
// Make sure the recv side can be signaled to return ack
solicited = true;
} else if (_accumulated_ack > _remote_window_capacity / 4) {
// Make sure the recv side can be signaled to handle ack
solicited = true;
} else if (_unsolicited_bytes > 1048576) {
// Make sure the recv side can be signaled when it receives enough data
solicited = true;
} else {
++_unsolicited;
_unsolicited_bytes += this_len;
_accumulated_ack += imm;
}
}
if (solicited) {
wr.send_flags |= IBV_SEND_SOLICITED;
_unsolicited = 0;
_unsolicited_bytes = 0;
_accumulated_ack = 0;
}
另一处是:
int RdmaEndpoint::SendAck(int num) {
if (_new_rq_wrs.fetch_add(num, butil::memory_order_relaxed) > _remote_window_capacity / 2) {
return SendImm(_new_rq_wrs.exchange(0, butil::memory_order_relaxed));
}
return 0;
}
int RdmaEndpoint::SendImm(uint32_t imm) {
if (imm == 0) {
return 0;
}
ibv_send_wr wr;
memset(&wr, 0, sizeof(wr));
wr.opcode = IBV_WR_SEND_WITH_IMM;
wr.imm_data = butil::HostToNet32(imm);
wr.send_flags |= IBV_SEND_SOLICITED;
wr.send_flags |= IBV_SEND_SIGNALED;
ibv_send_wr* bad = NULL;
int err = ibv_post_send(_resource->qp, &wr, &bad);
if (err != 0) {
// We use other way to guarantee the Send Queue is not full.
// So we just consider this error as an unrecoverable error.
LOG(WARNING) << "Fail to ibv_post_send: " << berror(err);
return -1;
}
return 0;
}
这两处都会设置IBV_SEND_SOLICITED,我理解你的意思就是去修改这两处提高event生成频率吗
由对端应用层发送立即数 ACK 来通知本端 poll cq, 每 1/4 窗口发送一个 signaled + solicited 的 wr, 对端收到 1/4 窗口个的消息后给本端发一个signaled + solicited wr 作为 ACK 通知本端 poll cq。当本端 poll 到 IBV_SEND_SIGNALED wr 产生的 CQE 后可以认为驱动层面释放掉了 1/4 窗口的 SQE。
我理解也有这个问题吧
由于以太网的问题send cqe一直在延迟生成,所以这个过程中一直没有poll到send cqe(按照目前brpc的默认配置,这个过程可能有四次或者两次event触发poll_cq,但在这个过程中send cqe的生成都被延迟了)。 最后当本端的sq应用层窗口消耗完毕,对端无法再回复,本端也无法再进入poll_cq了;最终导致超时。
当前实现得抢到_new_rq_wrs,才会发送立即数ACK。发送立即数ACK和发送消息时ACK是并发竞争的,对端收到 1/4 窗口个的消息后给本端发一个signaled + solicited wr 作为 ACK 通知本端 poll cq,看下来不好实现。
由对端应用层发送立即数 ACK 来通知本端 poll cq, 每 1/4 窗口发送一个 signaled + solicited 的 wr, 对端收到 1/4 窗口个的消息后给本端发一个signaled + solicited wr 作为 ACK 通知本端 poll cq。当本端 poll 到 IBV_SEND_SIGNALED wr 产生的 CQE 后可以认为驱动层面释放掉了 1/4 窗口的 SQE。
我理解也有这个问题吧
由于以太网的问题send cqe一直在延迟生成,所以这个过程中一直没有poll到send cqe(按照目前brpc的默认配置,这个过程可能有四次或者两次event触发poll_cq,但在这个过程中send cqe的生成都被延迟了)。 最后当本端的sq应用层窗口消耗完毕,对端无法再回复,本端也无法再进入poll_cq了;最终导致超时。
当前实现得抢到
_new_rq_wrs,才会发送立即数ACK。发送立即数ACK和发送消息时ACK是并发竞争的,对端收到 1/4 窗口个的消息后给本端发一个signaled + solicited wr 作为 ACK 通知本端 poll cq,看下来不好实现。
3145 pr中拆成两个CQ,我理解的话,拆分后 每1/4个窗口值才产生一个SQ对应CQ的event,这带来的cpu开销感觉可以接受