cpp-ipc
cpp-ipc copied to clipboard
能否提供更加复杂一点的Sample?
比如想实现类似HTTP请求的功能,需要用到ipc::channel支持多写多读。怎么实现进程1同一时刻多发,进程2同一时刻多收,分发处理,分发响应,不需要Lock?
channel 自身不是线程安全的,每个线程持有自己的 channel,然后 connect 到同一个 name 上就可以了。
同一个 channel 上的所有接收者是抢占式的,你需要自己做分发。 如果你需要 dispatch 的话,使用不同的 name 就可以了(类似 pub-sub),而 channel 的 name,就相当于 topic。
谢谢您的建议~ 我现在使用不同的name创建不同组channel,实现了消息的”并发“dispatcher,消息包采用Jsoncpp来打包解析。这个库使我受益很大,后续要多多研究内部机制与实现。
还有一个问题想请教下,虽然是基于共享内存达到不同进程间的数据通信,但API接口接收都是const char* 或std::string。如果我想传递复杂类型,比如std::vectorEigen::Vector3d之类的三维数据点集,要从Jsoncpp封送为std::string,再调用API分发,这样效率大打折扣,有没其它更优的方式呢?
你需要一个序列化库,比如 protobuf、capnp、flatbuf,你可以参考下这些库。 这个库里的序列化实现也可以参考下:https://github.com/qicosmos/rest_rpc
如果要将数据序列化,这性能就低了啊,我现在就是使用Jsoncpp序列化库。
共享内存在不同进程里的地址不一定相同,因此你无法直接将指针传递出去,你需要一个类似 boost::flat_buffer 的东西将内存内容平坦化。 除非你的数据本身就是可平凡拷贝的(is_trivially_copyable),你可以免去序列化的步骤。
目前的接口在发送时怎样都得拷贝一次数据。如果不想拷贝,就需要直接使用共享内存(而不是new/malloc)来构造自己的对象,之后通过 ipc 将它的“句柄”发出去。 由于共享内存的打开和关闭也是耗时操作,所以你需要自己建立一个共享内存池来做缓存。
Jsoncpp的性能是很低的,因为它本就不是为性能考虑的。需要性能的话,首先就不能考虑json,而应该使用idl。
我在cpp-ipc的基础之上,利用Boost共享内存机制解决了数据的“拷贝”问题。
#define BOOST_USE_WINDOWS_H
#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/allocators/allocator.hpp>
#include <boost/interprocess/containers/map.hpp>
#include <boost/interprocess/containers/vector.hpp>
#include <boost/interprocess/containers/string.hpp>
typedef boost::interprocess::managed_shared_memory::segment_manager segment_manager_t;
typedef boost::interprocess::allocator<void, segment_manager_t> void_allocator;
typedef boost::interprocess::allocator<Eigen::Vector3d, segment_manager_t> shmem_allocator;
typedef boost::interprocess::vector<Eigen::Vector3d, shmem_allocator> vector_vector3d;
static boost::interprocess::managed_shared_memory segment(boost::interprocess::open_or_create, name_shared_memory__, addr_shared_memory__);
static void_allocator alloc_inst(segment.get_segment_manager());
vector_points = segment.construct<vector_vector3d>(name_vector_points__)(alloc_inst);
后来,程序运行过程中。偶尔会出现崩溃现象,在系统的事件管理器中看到了如下日志,像这种问题如何去捕捉ipc引起的异常呢?
用 signal 捕捉一下,然后在触发的时候把调用栈打出来看看
用 signal 捕捉一下,然后在触发的时候把调用栈打出来看看
可以同您微信沟通吗?我在C++方面经验不足,这样说还不太明白怎么去做?
另外,我用了这里的Tutorial,稍微改了下,代码如下:
#include <thread>
#include "libipc/ipc.h"
int main()
{
using namespace std::literals;
std::size_t default_timeout = 3000; // ms
std::vector<char const*> const datas = {
"hello!",
//"foo",
//"bar",
//"ISO/IEC",
//"14882:2011",
//"ISO/IEC 14882:2017 Information technology - Programming languages - C++",
//"ISO/IEC 14882:2020",
//"Modern C++ Design: Generic Programming and Design Patterns Applied"
};
// thread producer
std::thread t1{ [&] {
ipc::channel cc { "my-ipc-channel", ipc::sender | ipc::receiver };
for (std::size_t i = 0; i < datas.size(); ++i) {
std::this_thread::sleep_for(1s);
// try sending data
while (!cc.send(datas[i])) {
// waiting for connection
cc.wait_for_recv(2); // 这里等待2个连接的原因是,thread producer的cc自身也是一个连接(ipc::receiver)
}
std::printf("producer send: %s\n", datas[i]);
// recv ack
auto dd = cc.recv();
auto str = static_cast<char*>(dd.data());
if (str == nullptr) {
std::printf("producer recv ack: error!\n");
}
else {
std::printf("producer recv ack: %c\n", str[0]);
}
}
// quit
cc.send(ipc::buff_t('\0'));
} };
// thread consumer
std::thread t2{ [&] {
ipc::channel cc { "my-ipc-channel", ipc::sender | ipc::receiver };
while (1) {
auto start_time = std::chrono::system_clock::now();
//auto dd = cc.recv();
auto dd = cc.recv(default_timeout);
auto elapsed_time = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now() - start_time).count();
if (elapsed_time >= default_timeout) {
std::printf("consumer send timeout, elapsed_time=%d\n", elapsed_time);
continue;
}
std::string str{ dd.get<char const*>(), dd.size() - 1 };
if (str.empty())
{
std::printf("Recv message is empty!\n");
continue;
}
std::printf("consumer recv: %s\n", str);
// try sending ack
//while (!cc.send(ipc::buff_t('a'))) {
// // waiting for connection
// cc.wait_for_recv(2);
//}
bool ret = cc.try_send(ipc::buff_t('a'), default_timeout);
if (!ret) std::printf("consumer send timeout\n");
}
} };
t1.join();
t2.join();
}
控制台输出是这样的,好像不太对
producer send: hello!
consumer recv: hello!
producer recv ack: a
Recv message is empty!
consumer send timeout, elapsed_time=3016
consumer send timeout, elapsed_time=3007
为什么会输出“Recv message is empty!”这条信息呢?
std::vector<char const*> const datas = {
"hello!",
//"foo",
//"bar",
//"ISO/IEC",
//"14882:2011",
//"ISO/IEC 14882:2017 Information technology - Programming languages - C++",
//"ISO/IEC 14882:2020",
//"Modern C++ Design: Generic Programming and Design Patterns Applied"
};
上面这段代码里,你的准备数据只有一个,所以 datas.size()
返回值为1。
因此 producer 线程在发完一个数据之后,即退出了 for 循环,走到了
// quit
cc.send(ipc::buff_t('\0'));
这里发送的一字节数据,被 consumer 接收到之后,通过
std::string str{ dd.get<char const*>(), dd.size() - 1 };
得到了一个空字符串(dd.size() - 1
等于0),因此打印出了“Recv message is empty!”。
我最近比较忙,你可以邮件([email protected])告诉我你的微信号,我来加你。
如您所述,将这里注释掉就不会打印“Recv message is empty!”这条消息了。我没注意到哈~
// cc.send(ipc::buff_t('\0'));