cpp-ipc icon indicating copy to clipboard operation
cpp-ipc copied to clipboard

ipc::chan对象执行析构或者disconnect时发生段错误

Open saladjay opened this issue 7 months ago • 4 comments

#include <map>
#include <chrono>
#include <thread>
#include <iostream>
#include <memory>
#include "libipc/ipc.h"

using msg_line = ipc::chan<ipc::relat::single, ipc::relat::single, ipc::trans::unicast>;

class shared_memory_register{
    public:
        shared_memory_register(){
            
        };  
        ~shared_memory_register(){
            
            for(auto iter=shared_memory_dict.begin(); iter!=shared_memory_dict.end(); iter++){
                
                iter->second->send(ipc::buff_t('\0'));
                iter->second->disconnect();
            }
            
        };

        bool add_shared_memory(const std::string& name){
            if(shared_memory_dict.find(name)==shared_memory_dict.end()){
                shared_memory_dict[name] = std::make_unique<msg_line>(name.c_str(), ipc::sender);
                return true;
            }else{
                return false;
            }
        };

        msg_line* get_sender(const std::string& name){
            if(shared_memory_dict.find(name)!=shared_memory_dict.end()){
                return shared_memory_dict[name].get();
            }else{
                return nullptr;
            }
        }

    private:
        std::map<std::string, std::unique_ptr<msg_line>> shared_memory_dict;
};

shared_memory_register _register;

bool create_shared_memory(const std::string& memory_name){
    
    _register.add_shared_memory(memory_name);
    
    return true;
};

bool send_data(const std::string& memory_name, std::vector<float>& data){
    
    msg_line* sender = _register.get_sender(memory_name);
    
    if(sender != nullptr){
        
        sender->send(data.data(), data.size());
        
    }
    
    return true;
}

bool receive_data(const std::string& memory_name, std::vector<float>& data){
    msg_line rec{memory_name.c_str(), ipc::receiver};
    while(true){
        auto buffer = rec.recv();
        float* data_pointer = reinterpret_cast<float*>(buffer.data());
        size_t size = buffer.size();

        std::vector<float> received_data(data_pointer, data_pointer+size);

        for(size_t i{0};i<size;++i){
            std::cout<<received_data[i]<<std::endl;
        }
    }
    return true;
}

// bool 

constexpr char const mode_s__[] = "s";
constexpr char const mode_r__[] = "r";
int main(int argc, char *argv[]){
    if (argc < 2) return 0;

    std::string mode {argv[1]};
    std::vector<float> data;
    for(size_t i{0};i<10;++i){
        data.push_back(float(i));
    }
    if (mode == mode_s__) {
        
        create_shared_memory(std::string{"abcd"});
        
        for(size_t count{0};count<10;++count){
            
            std::this_thread::sleep_for(std::chrono::milliseconds(1000));
            
            std::vector<float> float_data();
            
            send_data(std::string{"abcd"}, data);
            std::cout<<"**********"<<count<<"**********"<<std::endl;
        }
    } else if (mode == mode_r__) {
        std::vector<float> data_received;
        receive_data(std::string{"abcd"}, data_received);
        std::this_thread::sleep_for(std::chrono::milliseconds(1000));
        for(size_t i{0};i<data_received.size();++i){
            std::cout<<data_received[i]<<" ";
        }
        std::cout<<std::endl;
    }

}

平台 ubuntu18.04 gcc version 9.4.0 (Ubuntu 9.4.0-1ubuntu1~18.04) 代码分支版本v1.3.0 代码编译后单独执行发送模式,不使用另一个命令行执行接受模式 在代码执行disconnect时发生段错误

saladjay avatar May 21 '25 03:05 saladjay


#include <signal.h>
#include <map>
#include <iostream>
#include <string>
#include <atomic>
#include <thread>
#include <chrono>
#include <cstddef>
#include <type_traits>
#include "libipc/ipc.h"
// #include "capo/random.hpp"


#include <random>   // std::default_random_engine, std::uniform_int_distribution
#include <utility>  // std::forward

namespace capo {

////////////////////////////////////////////////////////////////
/// Simple way of generating random numbers
////////////////////////////////////////////////////////////////

template <class Engine       = std::default_random_engine, 
          class Distribution = std::uniform_int_distribution<>>
class random : public Distribution
{
    using base_t = Distribution;

public:
    using engine_type       = Engine;
    using distribution_type = Distribution;
    using result_type       = typename distribution_type::result_type;
    using param_type        = typename distribution_type::param_type;

private:
    engine_type engine_;

public:
    template <typename... T>
    random(T&&... args)
        : base_t (std::forward<T>(args)...)
        , engine_(std::random_device{}())
    {}

    result_type operator()(void)
    {
        return base_t::operator()(engine_);
    }

    result_type operator()(const param_type& parm)
    {
        return base_t::operator()(engine_, parm);
    }
};

} // namespace capo


namespace {

constexpr char const name__  [] = "ipc-msg-que";
constexpr char const mode_s__[] = "s";
constexpr char const mode_r__[] = "r";

constexpr std::size_t const min_sz = 128;
constexpr std::size_t const max_sz = 1024 * 16;

std::atomic<bool> is_quit__ {false};
std::atomic<std::size_t> size_counter__ {0};

using msg_que_t = ipc::chan<ipc::relat::single, ipc::relat::single, ipc::trans::unicast>;

msg_que_t* que__{ nullptr };
ipc::byte_t buff__[max_sz];
capo::random<> rand__{ 
    static_cast<int>(min_sz), 
    static_cast<int>(max_sz)
};

class shared_memory_register{
    public:
        shared_memory_register(){
            
        };  
        ~shared_memory_register(){
            for(auto iter=shared_memory_dict.begin(); iter!=shared_memory_dict.end(); iter++){
                iter->second->send(ipc::buff_t('\0'));
                iter->second->disconnect();
            }
        };

        template<typename T>
        bool add_shared_memory(const T& name){
            if constexpr(std::is_same_v<T, std::string>){
                if(shared_memory_dict.find(name)==shared_memory_dict.end()){
                    shared_memory_dict[name] = std::make_unique<msg_que_t>(name.c_str(), ipc::sender);
                    return true;
                }else{
                    return false;
                }
            }
            std::string converted_str;
            if constexpr(std::is_pointer_v<T> && (std::is_same_v<std::remove_pointer_t<T>, char> 
                || std::is_same_v<T, const char*>)){
                converted_str = std::string(name);
            }
            if constexpr(std::is_same_v<std::remove_extent_t<T>, char> && std::is_array_v<T>){
                converted_str = std::string(name);
            }

            if(!converted_str.empty()){
                if(shared_memory_dict.find(converted_str)==shared_memory_dict.end()){
                    shared_memory_dict[converted_str] = std::make_unique<msg_que_t>(converted_str.c_str(), ipc::sender);
                    return true;
                }else{
                    return false;
                }
            }
        };

        msg_que_t* get_sender(const std::string& name){
            if(shared_memory_dict.find(name)!=shared_memory_dict.end()){
                return shared_memory_dict[name].get();
            }else{
                return nullptr;
            }
        }

    private:
        std::map<std::string, std::unique_ptr<msg_que_t>> shared_memory_dict;
};

inline std::string str_of_size(std::size_t sz) noexcept {
    if (sz > 1024 * 1024) {
        return std::to_string(sz / (1024 * 1024)) + " MB";
    }
    if (sz > 1024) {
        return std::to_string(sz / 1024) + " KB";
    }
    return std::to_string(sz) + " bytes";
}

inline std::string speed_of(std::size_t sz) noexcept {
    return str_of_size(sz) + "/s";
}

void do_counting() {
    for (int i = 1; !is_quit__.load(std::memory_order_acquire); ++i) {
        std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 100 ms
        if (i % 10) continue;
        i = 0;
        std::cout
            << speed_of(size_counter__.exchange(0, std::memory_order_relaxed))
            << std::endl;
    }
}
shared_memory_register _register;
void do_send() {
    ipc::byte_t count{0};
    std::cout 
        << __func__ << ": start [" 
        << str_of_size(min_sz) << " - " << str_of_size(max_sz) 
        << "]...\n";
    _register.add_shared_memory(name__);
    que__ = _register.get_sender(name__);
    if (!que__->reconnect(ipc::sender)) {
        std::cerr << __func__ << ": connect failed.\n";
    }
    else {
        std::thread counting{ do_counting };
        while (!is_quit__.load(std::memory_order_acquire)) {
            std::size_t sz = static_cast<std::size_t>(rand__());
            buff__[count] = count;
            count++;
            if (!que__->send(ipc::buff_t(buff__, sz))) {
                std::cerr << __func__ << ": send failed.\n";
                std::cout << __func__ << ": waiting for receiver...\n";
                if (!que__->wait_for_recv(1)) {
                    std::cerr << __func__ << ": wait receiver failed.\n";
                    is_quit__.store(true, std::memory_order_release);
                    break;
                }
            }
            size_counter__.fetch_add(sz, std::memory_order_relaxed);
            std::this_thread::yield();
        }
        counting.join();
    }
    std::cout << __func__ << ": quit...\n";
}

void do_recv() {
    ipc::byte_t count{0};
    std::cout
        << __func__ << ": start ["
        << str_of_size(min_sz) << " - " << str_of_size(max_sz)
        << "]...\n";
    que__ = new msg_que_t(name__, ipc::receiver);
    if (!que__->reconnect(ipc::receiver)) {
        std::cerr << __func__ << ": connect failed.\n";
    }
    else {
        std::thread counting{ do_counting };
        while (!is_quit__.load(std::memory_order_acquire)) {
            auto msg = que__->recv();
            if (msg.empty()) break;
            ipc::byte_t* buffer = reinterpret_cast<ipc::byte_t*>(msg.data());
            // std::cout<<"addr:"<<static_cast<int>(count)<<" value:"<<static_cast<int>(buffer[count])<<std::endl;
            count++;
            size_counter__.fetch_add(msg.size(), std::memory_order_relaxed);
        }
        counting.join();
    }
    std::cout << __func__ << ": quit...\n";
}

} // namespace

int main(int argc, char ** argv) {
    if (argc < 2) return 0;

    auto exit = [](int) {
        is_quit__.store(true, std::memory_order_release);
        que__->disconnect();
    };
    ::signal(SIGINT  , exit);
    ::signal(SIGABRT , exit);
    ::signal(SIGSEGV , exit);
    ::signal(SIGTERM , exit);
#if defined(WIN64) || defined(_WIN64) || defined(__WIN64__) || \
    defined(WIN32) || defined(_WIN32) || defined(__WIN32__) || defined(__NT__) || \
    defined(WINCE) || defined(_WIN32_WCE)
    ::signal(SIGBREAK, exit);
#else
    ::signal(SIGHUP  , exit);
#endif

    std::string mode {argv[1]};
    if (mode == mode_s__) {
        do_send();
    } else if (mode == mode_r__) {
        do_recv();
    }
    return 0;
}

模仿demo中的msg_que写的样例,如果选择创建_register的方式去创建发送队列,按下ctrl+c后也会出现段错误 0 bytes/s ^Cdo_send: quit... fail: send, there is no receiver on this connection. 段错误 (核心已转储)

saladjay avatar May 21 '25 08:05 saladjay


#include <signal.h>
#include <map>
#include <iostream>
#include <string>
#include <atomic>
#include <thread>
#include <chrono>
#include <cstddef>
#include <type_traits>
#include "libipc/ipc.h"
// #include "capo/random.hpp"

using msg_que_t = ipc::chan<ipc::relat::single, ipc::relat::single, ipc::trans::unicast>;



class shared_memory_register{
    public:
        shared_memory_register(){};  
        ~shared_memory_register(){
            for(auto iter=shared_memory_dict.begin(); iter!=shared_memory_dict.end(); iter++){
                iter->second->disconnect();
            }
        };

        bool add_shared_memory(const std::string& name){
                if(shared_memory_dict.find(name)==shared_memory_dict.end()){
                    shared_memory_dict[name] = std::make_unique<msg_que_t>(name.c_str(), ipc::sender);
                    return true;
                }else{
                    return false;
                }
            return false;
        };
    private:
        std::map<std::string, std::unique_ptr<msg_que_t>> shared_memory_dict;
};

shared_memory_register _register;

int main(int argc, char ** argv) {
    _register.add_shared_memory(std::string("abcde"));
    return 0;
}

这个是最小复现例子 使用gdb跟踪

127     in pthread_once.c
(gdb) 
futex_wake (private=0, processes_to_wake=2147483647, futex_word=0x555555790204 <a0_tid_reset_atfork_once>) at ../sysdeps/unix/sysv/linux/futex-internal.h:231
231     ../sysdeps/unix/sysv/linux/futex-internal.h: 没有那个文件或目录.
(gdb) 
__pthread_once_slow (once_control=0x555555790204 <a0_tid_reset_atfork_once>, init_routine=0x55555557a58d <a0_tid_reset_atfork>) at pthread_once.c:132
132     pthread_once.c: 没有那个文件或目录.
(gdb) 
a0_tid () at /data4/dyj/project/shared_memory/cpp-ipc/src/libipc/platform/linux/a0/tid.c:29
29        return a0_tid_cache;
(gdb) 
30      }
(gdb) 
a0_mtx_timedlock_robust (timeout=0x0, mtx=0x7ffff7fec000) at /data4/dyj/project/shared_memory/cpp-ipc/src/libipc/platform/linux/a0/mtx.c:183
183       int syserr = EINTR;
(gdb) 
184       while (syserr == EINTR) {
(gdb) 
186         if (ftx_notrecoverable(a0_atomic_load(&mtx->ftx))) {
(gdb) 


Program received signal SIGSEGV, Segmentation fault.
a0_mtx_timedlock_robust (timeout=0x0, mtx=0x7ffff7fec000) at /data4/dyj/project/shared_memory/cpp-ipc/src/libipc/platform/linux/a0/mtx.c:186
186         if (ftx_notrecoverable(a0_atomic_load(&mtx->ftx))) {
(gdb) 

Program terminated with signal SIGSEGV, Segmentation fault.

最后出错时在

A0_STATIC_INLINE
a0_err_t a0_mtx_timedlock_robust(a0_mtx_t* mtx, const a0_time_mono_t* timeout) {
  const uint32_t tid = a0_tid();

  int syserr = EINTR;
  while (syserr == EINTR) {
    // Can't lock if borked.
    if (ftx_notrecoverable(a0_atomic_load(&mtx->ftx))) {
      return A0_MAKE_SYSERR(ENOTRECOVERABLE);
    }

    // Try to lock without kernel involvement.
    if (a0_cas(&mtx->ftx, 0, tid)) {
      return A0_OK;
    }

    // Ask the kernel to lock.
    syserr = A0_SYSERR(a0_ftx_lock_pi(&mtx->ftx, timeout));
  }

  if (!syserr) {
    if (ftx_owner_died(a0_atomic_load(&mtx->ftx))) {
      return A0_MAKE_SYSERR(EOWNERDEAD);
    }
    return A0_OK;
  }

  return A0_MAKE_SYSERR(syserr);
}

中的 if (ftx_notrecoverable(a0_atomic_load(&mtx->ftx))) {

saladjay avatar May 21 '25 08:05 saladjay

切换到develop分支也没有解决我的问题,仍然发生了段错误。段错误发生在同一个地方。

saladjay avatar May 21 '25 08:05 saladjay

看起来是析构时序的问题。你的_register对象是一个全局变量,它的析构发生在库内部全局对象之后,导致内存访问异常。比如改成这样就不会有问题了:

#include <signal.h>
#include <map>
#include <iostream>
#include <string>
#include <atomic>
#include <thread>
#include <chrono>
#include <cstddef>
#include <type_traits>
#include "libipc/ipc.h"
// #include "capo/random.hpp"

using msg_que_t = ipc::chan<ipc::relat::single, ipc::relat::single, ipc::trans::unicast>;

class shared_memory_register {
public:
    shared_memory_register() {}
    ~shared_memory_register() {}

    bool add_shared_memory(std::string const &name) {
        if (shared_memory_dict.find(name) == shared_memory_dict.end()){
            shared_memory_dict[name] = std::make_unique<msg_que_t>(name.c_str(), ipc::sender);
            return true;
        } else {
            return false;
        }
    }

    void clear() {
        for (auto iter = shared_memory_dict.begin(); iter != shared_memory_dict.end(); ++iter) {
            iter->second->disconnect();
        }
        shared_memory_dict.clear();
    }

private:
    std::map<std::string, std::unique_ptr<msg_que_t>> shared_memory_dict;
};

shared_memory_register _register;

int main(int argc, char ** argv) {
    _register.add_shared_memory(std::string("abcde"));
    _register.clear();
    return 0;
}

建议你手动控制下清理的时间。

mutouyun avatar May 24 '25 07:05 mutouyun