brpc icon indicating copy to clipboard operation
brpc copied to clipboard

multi连接方式

Open jamesge opened this issue 7 years ago • 7 comments

可以理解为“由多个single连接组成”的连接方式。相比single可以更好地应对网络抖动,相比pooled可以节省连接数和读写开销。支持single方式的协议都支持multi方式。

single总与一个对端保持一个连接,一个连接上可以同时做多个rpc,所以回复中一定要有id机制以和请求对上,对协议设计有要求。

  • pros:节省连接数是显而易见的,这对于有几千台机器的分布式集群很重要。一个连接上同时发送的多个请求可以被合并,从而减低读写开销,在小包高qps场景中,对cpu的节省效果很明显。
  • cons :把“很多鸡蛋放在了一个篮子里”,当tcp连接抖动时并触发RTO退避策略时,一大批请求都会受影响,在广域网中的SLA会比较糟糕。

pooled则是与一个对端保持一个连接池,每次发送前从池中取一个连接,一个连接上同时只能做一个rpc,直到回复回来,才能放回对应的连接池。如果rpc超时,对应的连接会被关闭(以防止老回复回来,打破了一个连接上只有一个rpc的假设)。协议不需要传递id字段,几乎所有协议都支持。

  • pros:天然自带负载均衡,是“最小连接数”策略的特殊形式,延时往往是最优的,并且很健壮。因为某个连接抖动也意味着对应的rpc没有结束,那个连接就不会还回连接池,也不会被其他rpc选到,最后一般只影响一个请求,在广域网中的效果很好。
  • cons:连接数很多,基本不能用在大型分布式系统内。一个连接每次最多写一个请求,在小包高qps场景中,sys-read/write的占比会非常显著。

multi方式可视作对single和pooled的折衷,相比single连接数扩大了常数倍(比如2-3倍),但是同时又能像pooled那样在某个连接抖动时通过走其他连接,控制影响面。所以从这点来说,multi方式并不是简单地建立多个single连接,随机分发,那样是没有反馈的,multi方式是让每个连接上进行的rpc数尽量相当,当某一路抖动时,其上的rpc数也不会减少,从而让框架选择其他连接。

jamesge avatar Sep 21 '18 04:09 jamesge

开发方案描述:

  1. 类Socket中增加_rpc_count表示上面请求的rpc个数,用于表示负载。 GetSocket()时_rpc_count加一,ReturnSocket()时_rpc_count减一。

  2. gflag max_connection_multiple_size表示最多可创建的connection

  3. gflag gap_threshold_for_switch_multiple_connection 表示当两个连接上的_rpc_count的差值大于这个数时,将选择负载较轻的连接。

  • 所以当gap_threshold_for_switch_multiple_connection为0时,类似于pooled的方式,每次都尽量选择负载为0的连接,不同是,当连接创建满时,复用现有的负载最轻的连接。
  • 当gap_threshold_for_switch_multiple_connection设置为无限大时,就是single的方式。
  1. 类SocketMultiple(类似SocketPooled,会共同继承一个抽象类SocketGroup)用来管理multiple connection。
  • SocketMultiple::_lightest_sid:当前负载最轻连接(和gap_threshold_for_switch_multiple_connection值相关),每个连接 GetSocket()和ReturnSocket()时都会和当前的_lightest_sid比较,如果_lightest_sid负载减去当前连接负载的差值超过gap_threshold_for_switch_multiple_connection。则替换_lightest_sid。
  • SocketMultiple::_free:一个数组,用来记录当前负载为0的连接。记录的原因是负载为0的不会再更新负载了,需要记录下来。
  • SocketMultiple::GetSocket():选择一个连接,方式如下:先选择_lightest_sid,如果其负载_rpc_count <= gap_threshold_for_switch_multiple_connection 则选择成功,否则如果_free非空,pop一个出来使用,否则如果连接未创建满,则创建一个新的连接,否则保持选择当前的_lightest_sid。新选择的连接不是当前_lightest_sid需要更新_lightest_sid。
  • SocketMultiple::ReturnSocket():如果负载比当前_lightest_sid轻(差值大于gap_threshold_for_switch_multiple_connection )则更新_lightest_sid,如果负载为0,则push_back到_free中去。
  1. SocketMultiple::_free的更新需要加锁,其他原子变量更新。

cdjingit avatar Oct 08 '18 09:10 cdjingit

@jamesge 帮忙看下上面的方案描述

cdjingit avatar Oct 08 '18 09:10 cdjingit

伪码实现:

class  SocketMultiple : public SocketGroup {
public:
    explicit SocketMultiple(const SocketOptions& opt);
    virtual ~SocketMultiple();
    virtual int GetSocket(SocketUniquePtr* ptr);
    virtual void ReturnSocket(Socket* sock);
private:
    butil::atomic<uint64_t>  _rpc_count;     // 总请求数
    butil::atomic<size_t> _num_created;      //创建的连接数
    butil::atomic<int> _num_free;               //负载为0的连接数
    butil::atomic<SocketId> _lightest_sid;   //负载最轻的连接
    butil::Mutex _mutex;
    std::vector<SocketId> _free;                 //负载为0的连接
    std::vector<SocketId> _multiple;          // 所有已创建连接
};

cdjingit avatar Oct 10 '18 04:10 cdjingit

选择连接:

int SocketMultiple::GetSocket(SocketUniquePtr* ptr) {
    _rpc_count.fetch_add(1, butil::memory_order_relaxed);
    SocketId lsid = _lightest_sid.load(butil::memory_order_acquire);
    SocketUniquePtr lptr;
    const uint32_t threshold = FLAGS_gap_threshold_for_multiple_connections;
   // 先检查_lightest_sid的负载是否小于threshold,如果小于保持在_lightest_sid发送请求
    if (Socket::Address(lsid, &lptr) == 0) {
        if ( lptr->_rpc_count.fetch_add(1, butil::memory_order_relaxed) < threshold) {
            ptr->reset(lptr.release());
            return 0;
        }
    }
    // 如果有负载为0的连接,弹出一个使用
    if (_num_free.fetch_sub(1, butil::memory_order_relaxed) > 0) {
        SocketId sid;
        {
            BAIDU_SCOPED_LOCK(_mutex);
            CHECK(!_free.empty()) << "No free multiple socket found";
            sid = _free.back();
            _free.pop_back();
        }
				....
        // 更新当前_lightest_sid
        _lightest_sid.compare_exchange_strong(lsid, sid, butil::memory_order_relaxed);
        return 0;
    } else {
        _num_free.fetch_add(1, butil::memory_order_relaxed);
    }
    // 如果连接未建满,创建一个新的连接
    if (_num_created.fetch_add(1, butil::memory_order_relaxed) < FLAGS_max_connection_multiple_size) {
        Create(opt, &sid);
			  Socket::Address(sid, ptr);
		    ....
        (*ptr)->_rpc_count.fetch_add(1, butil::memory_order_acquire);
        //更新_lightest_sid
        _lightest_sid.compare_exchange_strong(lsid, sid, butil::memory_order_relaxed);
        BAIDU_SCOPED_LOCK(_mutex);
        _multiple.emplace(sid);
        return 0;
    } else {
        _num_created.fetch_sub(1, butil::memory_order_relaxed);
    }
    // 保持在_lightest_sid上发送请求,在burst的情况下,瞬间可能有多个连接在上面发送。
    // 可以比较当前的rpc_count均值来避免连接过大。
    if (lptr) {
        load = lptr->_rpc_count.load(butil::memory_order_relaxed);
        // _lightest_sid的负载超过2 * Av_load+threshold(平均负载的2倍加threshold)
        // 这边条件应该较严格,防止经常跑进这个逻辑
        if ((load - threshold) * _num_created > 2*_rpc_count ) {
             // 随机遍历选择一个小于平均负载的sid
             SocketUniquePtr rand_ptr;
             {
                 BAIDU_SCOPED_LOCK(_mutex);
                 for( sid : _multiple) {
                      Socketid sid;
                 }
             }
            // 更新_lightest_sid
            do {
                if (Socket::Address(lsid, &new_lptr) == 0) {
                    if (new_lptr->_rpc_count.load(butil::memory_order_relaxed) 
                        < rand_ptr->_rpc_count.load(butil::memory_order_relaxed) ) {
                        ptr->reset(new_lptr.release());
                        return 0;
                    }
                }
            } while (! _lightest_sid.compare_exchange_strong(lsid, rand_sid, butil::memory_order_relaxed);)
            ptr->reset(rand_ptr.release());
            return 0;
        } else {  // 否则保持_lightest_sid
            ptr->reset(lptr.release());
            return 0;
        }
    }
    return -1;
}

cdjingit avatar Oct 10 '18 04:10 cdjingit

返还连接:

void SocketMultiple::ReturnSocket(Socket* sock) {
    // 总rpc_cout减一
     _rpc_count.fetch_sub(1, butil::memory_order_relaxed);
    // 当前连接的rpc_cout减一
    uint32_t load = sock->_rpc_count.fetch_sub(1, butil::memory_order_acquire);
    --load;
    SocketId min_sid = _lightest_sid.load(butil::memory_order_relaxed);
    if (id == min_sid) {
        return;
    }
    // 如果该连接的rpc_cout 与_lightest_sid的差值大于threshold 则更新_lightest_sid。
    // 目的防止颠簸,没有必要负载小于就更新
    for(;;) {
        SocketUniquePtr ptr;
        if (Socket::Address(min_sid, &ptr) != 0) {
            break;
        }
        if (id == min_sid) {
            return;
        }
        if (load + threshold>= ptr->_rpc_count.load(butil::memory_order_acquire)) {
            break;
        }
        if (_lightest_sid.compare_exchange_strong(
            min_sid, id, butil::memory_order_relaxed)) {
            return;
        }
    }
    // 如果没有更新成功,且负载为0,保存到free队列中去。 
    if (load == 0) {
        _num_free.fetch_add(1, butil::memory_order_relaxed);
        BAIDU_SCOPED_LOCK(_mutex);
        _free.push_back(id);
    }
}

cdjingit avatar Oct 10 '18 04:10 cdjingit

https://github.com/brpc/brpc/pull/536

gydong avatar Dec 13 '18 02:12 gydong

+1

Aaaaaaron avatar Jan 10 '22 09:01 Aaaaaaron