multi连接方式
可以理解为“由多个single连接组成”的连接方式。相比single可以更好地应对网络抖动,相比pooled可以节省连接数和读写开销。支持single方式的协议都支持multi方式。
single总与一个对端保持一个连接,一个连接上可以同时做多个rpc,所以回复中一定要有id机制以和请求对上,对协议设计有要求。
- pros:节省连接数是显而易见的,这对于有几千台机器的分布式集群很重要。一个连接上同时发送的多个请求可以被合并,从而减低读写开销,在小包高qps场景中,对cpu的节省效果很明显。
- cons :把“很多鸡蛋放在了一个篮子里”,当tcp连接抖动时并触发RTO退避策略时,一大批请求都会受影响,在广域网中的SLA会比较糟糕。
pooled则是与一个对端保持一个连接池,每次发送前从池中取一个连接,一个连接上同时只能做一个rpc,直到回复回来,才能放回对应的连接池。如果rpc超时,对应的连接会被关闭(以防止老回复回来,打破了一个连接上只有一个rpc的假设)。协议不需要传递id字段,几乎所有协议都支持。
- pros:天然自带负载均衡,是“最小连接数”策略的特殊形式,延时往往是最优的,并且很健壮。因为某个连接抖动也意味着对应的rpc没有结束,那个连接就不会还回连接池,也不会被其他rpc选到,最后一般只影响一个请求,在广域网中的效果很好。
- cons:连接数很多,基本不能用在大型分布式系统内。一个连接每次最多写一个请求,在小包高qps场景中,sys-read/write的占比会非常显著。
multi方式可视作对single和pooled的折衷,相比single连接数扩大了常数倍(比如2-3倍),但是同时又能像pooled那样在某个连接抖动时通过走其他连接,控制影响面。所以从这点来说,multi方式并不是简单地建立多个single连接,随机分发,那样是没有反馈的,multi方式是让每个连接上进行的rpc数尽量相当,当某一路抖动时,其上的rpc数也不会减少,从而让框架选择其他连接。
开发方案描述:
-
类Socket中增加_rpc_count表示上面请求的rpc个数,用于表示负载。 GetSocket()时_rpc_count加一,ReturnSocket()时_rpc_count减一。
-
gflag max_connection_multiple_size表示最多可创建的connection
-
gflag gap_threshold_for_switch_multiple_connection 表示当两个连接上的_rpc_count的差值大于这个数时,将选择负载较轻的连接。
- 所以当gap_threshold_for_switch_multiple_connection为0时,类似于pooled的方式,每次都尽量选择负载为0的连接,不同是,当连接创建满时,复用现有的负载最轻的连接。
- 当gap_threshold_for_switch_multiple_connection设置为无限大时,就是single的方式。
- 类SocketMultiple(类似SocketPooled,会共同继承一个抽象类SocketGroup)用来管理multiple connection。
- SocketMultiple::_lightest_sid:当前负载最轻连接(和gap_threshold_for_switch_multiple_connection值相关),每个连接 GetSocket()和ReturnSocket()时都会和当前的_lightest_sid比较,如果_lightest_sid负载减去当前连接负载的差值超过gap_threshold_for_switch_multiple_connection。则替换_lightest_sid。
- SocketMultiple::_free:一个数组,用来记录当前负载为0的连接。记录的原因是负载为0的不会再更新负载了,需要记录下来。
- SocketMultiple::GetSocket():选择一个连接,方式如下:先选择_lightest_sid,如果其负载_rpc_count <= gap_threshold_for_switch_multiple_connection 则选择成功,否则如果_free非空,pop一个出来使用,否则如果连接未创建满,则创建一个新的连接,否则保持选择当前的_lightest_sid。新选择的连接不是当前_lightest_sid需要更新_lightest_sid。
- SocketMultiple::ReturnSocket():如果负载比当前_lightest_sid轻(差值大于gap_threshold_for_switch_multiple_connection )则更新_lightest_sid,如果负载为0,则push_back到_free中去。
- SocketMultiple::_free的更新需要加锁,其他原子变量更新。
@jamesge 帮忙看下上面的方案描述
伪码实现:
class SocketMultiple : public SocketGroup {
public:
explicit SocketMultiple(const SocketOptions& opt);
virtual ~SocketMultiple();
virtual int GetSocket(SocketUniquePtr* ptr);
virtual void ReturnSocket(Socket* sock);
private:
butil::atomic<uint64_t> _rpc_count; // 总请求数
butil::atomic<size_t> _num_created; //创建的连接数
butil::atomic<int> _num_free; //负载为0的连接数
butil::atomic<SocketId> _lightest_sid; //负载最轻的连接
butil::Mutex _mutex;
std::vector<SocketId> _free; //负载为0的连接
std::vector<SocketId> _multiple; // 所有已创建连接
};
选择连接:
int SocketMultiple::GetSocket(SocketUniquePtr* ptr) {
_rpc_count.fetch_add(1, butil::memory_order_relaxed);
SocketId lsid = _lightest_sid.load(butil::memory_order_acquire);
SocketUniquePtr lptr;
const uint32_t threshold = FLAGS_gap_threshold_for_multiple_connections;
// 先检查_lightest_sid的负载是否小于threshold,如果小于保持在_lightest_sid发送请求
if (Socket::Address(lsid, &lptr) == 0) {
if ( lptr->_rpc_count.fetch_add(1, butil::memory_order_relaxed) < threshold) {
ptr->reset(lptr.release());
return 0;
}
}
// 如果有负载为0的连接,弹出一个使用
if (_num_free.fetch_sub(1, butil::memory_order_relaxed) > 0) {
SocketId sid;
{
BAIDU_SCOPED_LOCK(_mutex);
CHECK(!_free.empty()) << "No free multiple socket found";
sid = _free.back();
_free.pop_back();
}
....
// 更新当前_lightest_sid
_lightest_sid.compare_exchange_strong(lsid, sid, butil::memory_order_relaxed);
return 0;
} else {
_num_free.fetch_add(1, butil::memory_order_relaxed);
}
// 如果连接未建满,创建一个新的连接
if (_num_created.fetch_add(1, butil::memory_order_relaxed) < FLAGS_max_connection_multiple_size) {
Create(opt, &sid);
Socket::Address(sid, ptr);
....
(*ptr)->_rpc_count.fetch_add(1, butil::memory_order_acquire);
//更新_lightest_sid
_lightest_sid.compare_exchange_strong(lsid, sid, butil::memory_order_relaxed);
BAIDU_SCOPED_LOCK(_mutex);
_multiple.emplace(sid);
return 0;
} else {
_num_created.fetch_sub(1, butil::memory_order_relaxed);
}
// 保持在_lightest_sid上发送请求,在burst的情况下,瞬间可能有多个连接在上面发送。
// 可以比较当前的rpc_count均值来避免连接过大。
if (lptr) {
load = lptr->_rpc_count.load(butil::memory_order_relaxed);
// _lightest_sid的负载超过2 * Av_load+threshold(平均负载的2倍加threshold)
// 这边条件应该较严格,防止经常跑进这个逻辑
if ((load - threshold) * _num_created > 2*_rpc_count ) {
// 随机遍历选择一个小于平均负载的sid
SocketUniquePtr rand_ptr;
{
BAIDU_SCOPED_LOCK(_mutex);
for( sid : _multiple) {
Socketid sid;
}
}
// 更新_lightest_sid
do {
if (Socket::Address(lsid, &new_lptr) == 0) {
if (new_lptr->_rpc_count.load(butil::memory_order_relaxed)
< rand_ptr->_rpc_count.load(butil::memory_order_relaxed) ) {
ptr->reset(new_lptr.release());
return 0;
}
}
} while (! _lightest_sid.compare_exchange_strong(lsid, rand_sid, butil::memory_order_relaxed);)
ptr->reset(rand_ptr.release());
return 0;
} else { // 否则保持_lightest_sid
ptr->reset(lptr.release());
return 0;
}
}
return -1;
}
返还连接:
void SocketMultiple::ReturnSocket(Socket* sock) {
// 总rpc_cout减一
_rpc_count.fetch_sub(1, butil::memory_order_relaxed);
// 当前连接的rpc_cout减一
uint32_t load = sock->_rpc_count.fetch_sub(1, butil::memory_order_acquire);
--load;
SocketId min_sid = _lightest_sid.load(butil::memory_order_relaxed);
if (id == min_sid) {
return;
}
// 如果该连接的rpc_cout 与_lightest_sid的差值大于threshold 则更新_lightest_sid。
// 目的防止颠簸,没有必要负载小于就更新
for(;;) {
SocketUniquePtr ptr;
if (Socket::Address(min_sid, &ptr) != 0) {
break;
}
if (id == min_sid) {
return;
}
if (load + threshold>= ptr->_rpc_count.load(butil::memory_order_acquire)) {
break;
}
if (_lightest_sid.compare_exchange_strong(
min_sid, id, butil::memory_order_relaxed)) {
return;
}
}
// 如果没有更新成功,且负载为0,保存到free队列中去。
if (load == 0) {
_num_free.fetch_add(1, butil::memory_order_relaxed);
BAIDU_SCOPED_LOCK(_mutex);
_free.push_back(id);
}
}
https://github.com/brpc/brpc/pull/536
+1