cpp-ipc
cpp-ipc copied to clipboard
ipc::chan对象执行析构或者disconnect时发生段错误
#include <map>
#include <chrono>
#include <thread>
#include <iostream>
#include <memory>
#include "libipc/ipc.h"
using msg_line = ipc::chan<ipc::relat::single, ipc::relat::single, ipc::trans::unicast>;
class shared_memory_register{
public:
shared_memory_register(){
};
~shared_memory_register(){
for(auto iter=shared_memory_dict.begin(); iter!=shared_memory_dict.end(); iter++){
iter->second->send(ipc::buff_t('\0'));
iter->second->disconnect();
}
};
bool add_shared_memory(const std::string& name){
if(shared_memory_dict.find(name)==shared_memory_dict.end()){
shared_memory_dict[name] = std::make_unique<msg_line>(name.c_str(), ipc::sender);
return true;
}else{
return false;
}
};
msg_line* get_sender(const std::string& name){
if(shared_memory_dict.find(name)!=shared_memory_dict.end()){
return shared_memory_dict[name].get();
}else{
return nullptr;
}
}
private:
std::map<std::string, std::unique_ptr<msg_line>> shared_memory_dict;
};
shared_memory_register _register;
bool create_shared_memory(const std::string& memory_name){
_register.add_shared_memory(memory_name);
return true;
};
bool send_data(const std::string& memory_name, std::vector<float>& data){
msg_line* sender = _register.get_sender(memory_name);
if(sender != nullptr){
sender->send(data.data(), data.size());
}
return true;
}
bool receive_data(const std::string& memory_name, std::vector<float>& data){
msg_line rec{memory_name.c_str(), ipc::receiver};
while(true){
auto buffer = rec.recv();
float* data_pointer = reinterpret_cast<float*>(buffer.data());
size_t size = buffer.size();
std::vector<float> received_data(data_pointer, data_pointer+size);
for(size_t i{0};i<size;++i){
std::cout<<received_data[i]<<std::endl;
}
}
return true;
}
// bool
constexpr char const mode_s__[] = "s";
constexpr char const mode_r__[] = "r";
int main(int argc, char *argv[]){
if (argc < 2) return 0;
std::string mode {argv[1]};
std::vector<float> data;
for(size_t i{0};i<10;++i){
data.push_back(float(i));
}
if (mode == mode_s__) {
create_shared_memory(std::string{"abcd"});
for(size_t count{0};count<10;++count){
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
std::vector<float> float_data();
send_data(std::string{"abcd"}, data);
std::cout<<"**********"<<count<<"**********"<<std::endl;
}
} else if (mode == mode_r__) {
std::vector<float> data_received;
receive_data(std::string{"abcd"}, data_received);
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
for(size_t i{0};i<data_received.size();++i){
std::cout<<data_received[i]<<" ";
}
std::cout<<std::endl;
}
}
平台 ubuntu18.04 gcc version 9.4.0 (Ubuntu 9.4.0-1ubuntu1~18.04) 代码分支版本v1.3.0 代码编译后单独执行发送模式,不使用另一个命令行执行接受模式 在代码执行disconnect时发生段错误
#include <signal.h>
#include <map>
#include <iostream>
#include <string>
#include <atomic>
#include <thread>
#include <chrono>
#include <cstddef>
#include <type_traits>
#include "libipc/ipc.h"
// #include "capo/random.hpp"
#include <random> // std::default_random_engine, std::uniform_int_distribution
#include <utility> // std::forward
namespace capo {
////////////////////////////////////////////////////////////////
/// Simple way of generating random numbers
////////////////////////////////////////////////////////////////
template <class Engine = std::default_random_engine,
class Distribution = std::uniform_int_distribution<>>
class random : public Distribution
{
using base_t = Distribution;
public:
using engine_type = Engine;
using distribution_type = Distribution;
using result_type = typename distribution_type::result_type;
using param_type = typename distribution_type::param_type;
private:
engine_type engine_;
public:
template <typename... T>
random(T&&... args)
: base_t (std::forward<T>(args)...)
, engine_(std::random_device{}())
{}
result_type operator()(void)
{
return base_t::operator()(engine_);
}
result_type operator()(const param_type& parm)
{
return base_t::operator()(engine_, parm);
}
};
} // namespace capo
namespace {
constexpr char const name__ [] = "ipc-msg-que";
constexpr char const mode_s__[] = "s";
constexpr char const mode_r__[] = "r";
constexpr std::size_t const min_sz = 128;
constexpr std::size_t const max_sz = 1024 * 16;
std::atomic<bool> is_quit__ {false};
std::atomic<std::size_t> size_counter__ {0};
using msg_que_t = ipc::chan<ipc::relat::single, ipc::relat::single, ipc::trans::unicast>;
msg_que_t* que__{ nullptr };
ipc::byte_t buff__[max_sz];
capo::random<> rand__{
static_cast<int>(min_sz),
static_cast<int>(max_sz)
};
class shared_memory_register{
public:
shared_memory_register(){
};
~shared_memory_register(){
for(auto iter=shared_memory_dict.begin(); iter!=shared_memory_dict.end(); iter++){
iter->second->send(ipc::buff_t('\0'));
iter->second->disconnect();
}
};
template<typename T>
bool add_shared_memory(const T& name){
if constexpr(std::is_same_v<T, std::string>){
if(shared_memory_dict.find(name)==shared_memory_dict.end()){
shared_memory_dict[name] = std::make_unique<msg_que_t>(name.c_str(), ipc::sender);
return true;
}else{
return false;
}
}
std::string converted_str;
if constexpr(std::is_pointer_v<T> && (std::is_same_v<std::remove_pointer_t<T>, char>
|| std::is_same_v<T, const char*>)){
converted_str = std::string(name);
}
if constexpr(std::is_same_v<std::remove_extent_t<T>, char> && std::is_array_v<T>){
converted_str = std::string(name);
}
if(!converted_str.empty()){
if(shared_memory_dict.find(converted_str)==shared_memory_dict.end()){
shared_memory_dict[converted_str] = std::make_unique<msg_que_t>(converted_str.c_str(), ipc::sender);
return true;
}else{
return false;
}
}
};
msg_que_t* get_sender(const std::string& name){
if(shared_memory_dict.find(name)!=shared_memory_dict.end()){
return shared_memory_dict[name].get();
}else{
return nullptr;
}
}
private:
std::map<std::string, std::unique_ptr<msg_que_t>> shared_memory_dict;
};
inline std::string str_of_size(std::size_t sz) noexcept {
if (sz > 1024 * 1024) {
return std::to_string(sz / (1024 * 1024)) + " MB";
}
if (sz > 1024) {
return std::to_string(sz / 1024) + " KB";
}
return std::to_string(sz) + " bytes";
}
inline std::string speed_of(std::size_t sz) noexcept {
return str_of_size(sz) + "/s";
}
void do_counting() {
for (int i = 1; !is_quit__.load(std::memory_order_acquire); ++i) {
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 100 ms
if (i % 10) continue;
i = 0;
std::cout
<< speed_of(size_counter__.exchange(0, std::memory_order_relaxed))
<< std::endl;
}
}
shared_memory_register _register;
void do_send() {
ipc::byte_t count{0};
std::cout
<< __func__ << ": start ["
<< str_of_size(min_sz) << " - " << str_of_size(max_sz)
<< "]...\n";
_register.add_shared_memory(name__);
que__ = _register.get_sender(name__);
if (!que__->reconnect(ipc::sender)) {
std::cerr << __func__ << ": connect failed.\n";
}
else {
std::thread counting{ do_counting };
while (!is_quit__.load(std::memory_order_acquire)) {
std::size_t sz = static_cast<std::size_t>(rand__());
buff__[count] = count;
count++;
if (!que__->send(ipc::buff_t(buff__, sz))) {
std::cerr << __func__ << ": send failed.\n";
std::cout << __func__ << ": waiting for receiver...\n";
if (!que__->wait_for_recv(1)) {
std::cerr << __func__ << ": wait receiver failed.\n";
is_quit__.store(true, std::memory_order_release);
break;
}
}
size_counter__.fetch_add(sz, std::memory_order_relaxed);
std::this_thread::yield();
}
counting.join();
}
std::cout << __func__ << ": quit...\n";
}
void do_recv() {
ipc::byte_t count{0};
std::cout
<< __func__ << ": start ["
<< str_of_size(min_sz) << " - " << str_of_size(max_sz)
<< "]...\n";
que__ = new msg_que_t(name__, ipc::receiver);
if (!que__->reconnect(ipc::receiver)) {
std::cerr << __func__ << ": connect failed.\n";
}
else {
std::thread counting{ do_counting };
while (!is_quit__.load(std::memory_order_acquire)) {
auto msg = que__->recv();
if (msg.empty()) break;
ipc::byte_t* buffer = reinterpret_cast<ipc::byte_t*>(msg.data());
// std::cout<<"addr:"<<static_cast<int>(count)<<" value:"<<static_cast<int>(buffer[count])<<std::endl;
count++;
size_counter__.fetch_add(msg.size(), std::memory_order_relaxed);
}
counting.join();
}
std::cout << __func__ << ": quit...\n";
}
} // namespace
int main(int argc, char ** argv) {
if (argc < 2) return 0;
auto exit = [](int) {
is_quit__.store(true, std::memory_order_release);
que__->disconnect();
};
::signal(SIGINT , exit);
::signal(SIGABRT , exit);
::signal(SIGSEGV , exit);
::signal(SIGTERM , exit);
#if defined(WIN64) || defined(_WIN64) || defined(__WIN64__) || \
defined(WIN32) || defined(_WIN32) || defined(__WIN32__) || defined(__NT__) || \
defined(WINCE) || defined(_WIN32_WCE)
::signal(SIGBREAK, exit);
#else
::signal(SIGHUP , exit);
#endif
std::string mode {argv[1]};
if (mode == mode_s__) {
do_send();
} else if (mode == mode_r__) {
do_recv();
}
return 0;
}
模仿demo中的msg_que写的样例,如果选择创建_register的方式去创建发送队列,按下ctrl+c后也会出现段错误 0 bytes/s ^Cdo_send: quit... fail: send, there is no receiver on this connection. 段错误 (核心已转储)
#include <signal.h>
#include <map>
#include <iostream>
#include <string>
#include <atomic>
#include <thread>
#include <chrono>
#include <cstddef>
#include <type_traits>
#include "libipc/ipc.h"
// #include "capo/random.hpp"
using msg_que_t = ipc::chan<ipc::relat::single, ipc::relat::single, ipc::trans::unicast>;
class shared_memory_register{
public:
shared_memory_register(){};
~shared_memory_register(){
for(auto iter=shared_memory_dict.begin(); iter!=shared_memory_dict.end(); iter++){
iter->second->disconnect();
}
};
bool add_shared_memory(const std::string& name){
if(shared_memory_dict.find(name)==shared_memory_dict.end()){
shared_memory_dict[name] = std::make_unique<msg_que_t>(name.c_str(), ipc::sender);
return true;
}else{
return false;
}
return false;
};
private:
std::map<std::string, std::unique_ptr<msg_que_t>> shared_memory_dict;
};
shared_memory_register _register;
int main(int argc, char ** argv) {
_register.add_shared_memory(std::string("abcde"));
return 0;
}
这个是最小复现例子 使用gdb跟踪
127 in pthread_once.c
(gdb)
futex_wake (private=0, processes_to_wake=2147483647, futex_word=0x555555790204 <a0_tid_reset_atfork_once>) at ../sysdeps/unix/sysv/linux/futex-internal.h:231
231 ../sysdeps/unix/sysv/linux/futex-internal.h: 没有那个文件或目录.
(gdb)
__pthread_once_slow (once_control=0x555555790204 <a0_tid_reset_atfork_once>, init_routine=0x55555557a58d <a0_tid_reset_atfork>) at pthread_once.c:132
132 pthread_once.c: 没有那个文件或目录.
(gdb)
a0_tid () at /data4/dyj/project/shared_memory/cpp-ipc/src/libipc/platform/linux/a0/tid.c:29
29 return a0_tid_cache;
(gdb)
30 }
(gdb)
a0_mtx_timedlock_robust (timeout=0x0, mtx=0x7ffff7fec000) at /data4/dyj/project/shared_memory/cpp-ipc/src/libipc/platform/linux/a0/mtx.c:183
183 int syserr = EINTR;
(gdb)
184 while (syserr == EINTR) {
(gdb)
186 if (ftx_notrecoverable(a0_atomic_load(&mtx->ftx))) {
(gdb)
Program received signal SIGSEGV, Segmentation fault.
a0_mtx_timedlock_robust (timeout=0x0, mtx=0x7ffff7fec000) at /data4/dyj/project/shared_memory/cpp-ipc/src/libipc/platform/linux/a0/mtx.c:186
186 if (ftx_notrecoverable(a0_atomic_load(&mtx->ftx))) {
(gdb)
Program terminated with signal SIGSEGV, Segmentation fault.
最后出错时在
A0_STATIC_INLINE
a0_err_t a0_mtx_timedlock_robust(a0_mtx_t* mtx, const a0_time_mono_t* timeout) {
const uint32_t tid = a0_tid();
int syserr = EINTR;
while (syserr == EINTR) {
// Can't lock if borked.
if (ftx_notrecoverable(a0_atomic_load(&mtx->ftx))) {
return A0_MAKE_SYSERR(ENOTRECOVERABLE);
}
// Try to lock without kernel involvement.
if (a0_cas(&mtx->ftx, 0, tid)) {
return A0_OK;
}
// Ask the kernel to lock.
syserr = A0_SYSERR(a0_ftx_lock_pi(&mtx->ftx, timeout));
}
if (!syserr) {
if (ftx_owner_died(a0_atomic_load(&mtx->ftx))) {
return A0_MAKE_SYSERR(EOWNERDEAD);
}
return A0_OK;
}
return A0_MAKE_SYSERR(syserr);
}
中的 if (ftx_notrecoverable(a0_atomic_load(&mtx->ftx))) {
切换到develop分支也没有解决我的问题,仍然发生了段错误。段错误发生在同一个地方。
看起来是析构时序的问题。你的_register对象是一个全局变量,它的析构发生在库内部全局对象之后,导致内存访问异常。比如改成这样就不会有问题了:
#include <signal.h>
#include <map>
#include <iostream>
#include <string>
#include <atomic>
#include <thread>
#include <chrono>
#include <cstddef>
#include <type_traits>
#include "libipc/ipc.h"
// #include "capo/random.hpp"
using msg_que_t = ipc::chan<ipc::relat::single, ipc::relat::single, ipc::trans::unicast>;
class shared_memory_register {
public:
shared_memory_register() {}
~shared_memory_register() {}
bool add_shared_memory(std::string const &name) {
if (shared_memory_dict.find(name) == shared_memory_dict.end()){
shared_memory_dict[name] = std::make_unique<msg_que_t>(name.c_str(), ipc::sender);
return true;
} else {
return false;
}
}
void clear() {
for (auto iter = shared_memory_dict.begin(); iter != shared_memory_dict.end(); ++iter) {
iter->second->disconnect();
}
shared_memory_dict.clear();
}
private:
std::map<std::string, std::unique_ptr<msg_que_t>> shared_memory_dict;
};
shared_memory_register _register;
int main(int argc, char ** argv) {
_register.add_shared_memory(std::string("abcde"));
_register.clear();
return 0;
}
建议你手动控制下清理的时间。