cpp-ipc icon indicating copy to clipboard operation
cpp-ipc copied to clipboard

channel 类型为<relat::multi, relat::multi, trans::broadcast>,(对receiver管理部分做了修改)运行一段时间后接收端掉线,无法接收数据

Open windowcc opened this issue 3 years ago • 2 comments

你好,我为了接触广播模式下receiver最大通道的限制.使用std::array<std::atomic<cc_t>,64> 替换了std::atomic<cc_t>,对receiver进行管理,但是在进行测试的时候发现,有些receiver客户端出现了掉线的情况.不知道这样的更改是否会对整个程序产生影响. 请问,是否有更好的方法可以解除receiver最大通道的限制? 更改后的代码如下所示:

#pragma once

#include <atomic>
#include <cstddef>
#include <cstdint>
#include <new>
#include <array>

#include "libipc/def.h"
#include "libipc/rw_lock.h"

#include "libipc/platform/detail.h"

namespace ipc {
namespace circ {

#define MAX_CONNECT_CNT 64

using u1_t = ipc::uint_t<8>;
using u2_t = ipc::uint_t<32>;

/** only supports max 32 connections in broadcast mode */
using cc_t = u2_t;

constexpr u1_t index_of(u2_t c) noexcept {
    return static_cast<u1_t>(c);
}

class conn_head_base {
protected:
    // std::atomic<cc_t> cc_{0}; // connections
    // std::array<std::atomic<cc_t>,64> cc_;
    ipc::spin_lock lc_;
    std::atomic<bool> constructed_{false};

public:
    void init() {
        /* DCLP */
        if (!constructed_.load(std::memory_order_acquire)) {
            IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lc_);
            if (!constructed_.load(std::memory_order_relaxed)) {
                ::new (this) conn_head_base;
                constructed_.store(true, std::memory_order_release);
            }
        }
    }

    conn_head_base() = default;
    conn_head_base(conn_head_base const &) = delete;
    conn_head_base &operator=(conn_head_base const &) = delete;

    // cc_t connections(std::memory_order order = std::memory_order_acquire) const noexcept {
    //     return this->cc_.load(order);
    // }
};

template <typename P, bool = relat_trait<P>::is_broadcast>
class conn_head;

template <typename P>
class conn_head<P, true> : public conn_head_base 
{
protected:
    std::atomic<cc_t> cnt_{0};          // 计数
    std::array<std::atomic<cc_t>,MAX_CONNECT_CNT> ids_{0};
public:
    cc_t connect(cc_t id) noexcept {
        if(connections() >= ids_.size()) {
            return 0;
        }
        for (auto i = 0; i < ids_.size(); i++)
        {
            cc_t cur = this->ids_[i].load(std::memory_order_acquire);
            if(cur) {
                if (cur == id) return 0;
                else continue;                
            }
            else {
                cc_t except = 0;
                this->ids_[i].compare_exchange_weak(except,id,std::memory_order_release);
                this->cnt_.fetch_add(1, std::memory_order_relaxed);
                return id;
            }
        }
        return 0;
    }

    cc_t disconnect(cc_t cc_id) noexcept {
        if(connections() <= 0) {
            return 0;
        }
        for (auto i = 0; i < ids_.size(); i++)
        {
            if(this->ids_[i].load(std::memory_order_acquire) == cc_id) {
                cc_t desire_val = 0;
                this->ids_[i].compare_exchange_weak(cc_id,desire_val,std::memory_order_release);
                this->cnt_.fetch_sub(1, std::memory_order_relaxed);
                return cc_id;
            }
        }
        return 0;
    }

    std::size_t conn_count(std::memory_order order = std::memory_order_acquire) const noexcept {
        return connections();
    }

    cc_t connections(std::memory_order order = std::memory_order_acquire) const noexcept {
        return cnt_.load(order);
    }
};

template <typename P>
class conn_head<P, false> : public conn_head_base 
{
protected:
    std::atomic<cc_t> cc_{0}; // connections
public:
    cc_t connect(cc_t id) noexcept {
        (void)id;
        return this->cc_.fetch_add(1, std::memory_order_relaxed) + 1;
    }

    cc_t disconnect(cc_t cc_id) noexcept {
        if (cc_id == ~static_cast<circ::cc_t>(0u)) {
            // clear all connections
            this->cc_.store(0, std::memory_order_relaxed);

            return 0u;
        }
        else {
            return this->cc_.fetch_sub(1, std::memory_order_relaxed) - 1;
        }
    }

    std::size_t conn_count(std::memory_order order = std::memory_order_acquire) const noexcept {
        return this->connections(order);
    }
    cc_t connections(std::memory_order order = std::memory_order_acquire) const noexcept {
        return this->cc_.load(order);
    }
};

} // namespace circ
} // namespace ipc

windowcc avatar Oct 20 '22 14:10 windowcc

很不错的想法,但32的限制问题并不是出在用于存储连接的ids_上,而是我们需要原子的标记某个recv的读取状态。 如果连接数超过限制,我们很难用原子操作对状态做查询了。

你给的connect、disconnect有点问题,多线程同时处理的时候有bug。 不过我想了下,思路应该是可行的,只是循环里的并发访问需要写对。 另外,我们在这种情况下就不应该用位来标记recv,而用一个数值(32位),所以还需要改下prod_cons的实现。

mutouyun avatar Oct 23 '22 05:10 mutouyun

谢谢, 我看到这部分的逻辑了. 我先仔细分析一下这部分的代码. 谢谢你的提示.

windowcc avatar Oct 23 '22 15:10 windowcc