cpp-ipc
cpp-ipc copied to clipboard
channel 类型为<relat::multi, relat::multi, trans::broadcast>,(对receiver管理部分做了修改)运行一段时间后接收端掉线,无法接收数据
你好,我为了接触广播模式下receiver最大通道的限制.使用std::array<std::atomic<cc_t>,64> 替换了std::atomic<cc_t>,对receiver进行管理,但是在进行测试的时候发现,有些receiver客户端出现了掉线的情况.不知道这样的更改是否会对整个程序产生影响. 请问,是否有更好的方法可以解除receiver最大通道的限制? 更改后的代码如下所示:
#pragma once
#include <atomic>
#include <cstddef>
#include <cstdint>
#include <new>
#include <array>
#include "libipc/def.h"
#include "libipc/rw_lock.h"
#include "libipc/platform/detail.h"
namespace ipc {
namespace circ {
#define MAX_CONNECT_CNT 64
using u1_t = ipc::uint_t<8>;
using u2_t = ipc::uint_t<32>;
/** only supports max 32 connections in broadcast mode */
using cc_t = u2_t;
constexpr u1_t index_of(u2_t c) noexcept {
return static_cast<u1_t>(c);
}
class conn_head_base {
protected:
// std::atomic<cc_t> cc_{0}; // connections
// std::array<std::atomic<cc_t>,64> cc_;
ipc::spin_lock lc_;
std::atomic<bool> constructed_{false};
public:
void init() {
/* DCLP */
if (!constructed_.load(std::memory_order_acquire)) {
IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lc_);
if (!constructed_.load(std::memory_order_relaxed)) {
::new (this) conn_head_base;
constructed_.store(true, std::memory_order_release);
}
}
}
conn_head_base() = default;
conn_head_base(conn_head_base const &) = delete;
conn_head_base &operator=(conn_head_base const &) = delete;
// cc_t connections(std::memory_order order = std::memory_order_acquire) const noexcept {
// return this->cc_.load(order);
// }
};
template <typename P, bool = relat_trait<P>::is_broadcast>
class conn_head;
template <typename P>
class conn_head<P, true> : public conn_head_base
{
protected:
std::atomic<cc_t> cnt_{0}; // 计数
std::array<std::atomic<cc_t>,MAX_CONNECT_CNT> ids_{0};
public:
cc_t connect(cc_t id) noexcept {
if(connections() >= ids_.size()) {
return 0;
}
for (auto i = 0; i < ids_.size(); i++)
{
cc_t cur = this->ids_[i].load(std::memory_order_acquire);
if(cur) {
if (cur == id) return 0;
else continue;
}
else {
cc_t except = 0;
this->ids_[i].compare_exchange_weak(except,id,std::memory_order_release);
this->cnt_.fetch_add(1, std::memory_order_relaxed);
return id;
}
}
return 0;
}
cc_t disconnect(cc_t cc_id) noexcept {
if(connections() <= 0) {
return 0;
}
for (auto i = 0; i < ids_.size(); i++)
{
if(this->ids_[i].load(std::memory_order_acquire) == cc_id) {
cc_t desire_val = 0;
this->ids_[i].compare_exchange_weak(cc_id,desire_val,std::memory_order_release);
this->cnt_.fetch_sub(1, std::memory_order_relaxed);
return cc_id;
}
}
return 0;
}
std::size_t conn_count(std::memory_order order = std::memory_order_acquire) const noexcept {
return connections();
}
cc_t connections(std::memory_order order = std::memory_order_acquire) const noexcept {
return cnt_.load(order);
}
};
template <typename P>
class conn_head<P, false> : public conn_head_base
{
protected:
std::atomic<cc_t> cc_{0}; // connections
public:
cc_t connect(cc_t id) noexcept {
(void)id;
return this->cc_.fetch_add(1, std::memory_order_relaxed) + 1;
}
cc_t disconnect(cc_t cc_id) noexcept {
if (cc_id == ~static_cast<circ::cc_t>(0u)) {
// clear all connections
this->cc_.store(0, std::memory_order_relaxed);
return 0u;
}
else {
return this->cc_.fetch_sub(1, std::memory_order_relaxed) - 1;
}
}
std::size_t conn_count(std::memory_order order = std::memory_order_acquire) const noexcept {
return this->connections(order);
}
cc_t connections(std::memory_order order = std::memory_order_acquire) const noexcept {
return this->cc_.load(order);
}
};
} // namespace circ
} // namespace ipc
很不错的想法,但32的限制问题并不是出在用于存储连接的ids_上,而是我们需要原子的标记某个recv的读取状态。
如果连接数超过限制,我们很难用原子操作对状态做查询了。
你给的connect、disconnect有点问题,多线程同时处理的时候有bug。 不过我想了下,思路应该是可行的,只是循环里的并发访问需要写对。 另外,我们在这种情况下就不应该用位来标记recv,而用一个数值(32位),所以还需要改下prod_cons的实现。
谢谢, 我看到这部分的逻辑了. 我先仔细分析一下这部分的代码. 谢谢你的提示.