关于以下代码的解读没有弄明白
你好!mutouyun 我在看代码的时候,以下两个代码没有明白,可否帮忙解析一下,谢谢 allloc.h中 static void** node_p(void* node) { return reinterpret_cast<void**>(node); }
static auto& next(void* node) { return *node_p(node); }
另外内存池中的内存随着数据频率发送的越高,其内存一直在增加,我的程序在引用的时候,每个数据大小概是16w左右/100ms,内存一直持续在增加,达到了1G之上,当暂亭发送的时候,内存池的内存也不见释放。请问这种情况如何处理? 谢谢!
能不能分享一下,你在做内存池时的设计原理。谢谢!
嗯。。我这个库里的内存池其实是比较简单的,主要是考虑到在不引入任何第三方内存池时的性能问题。 实现方式类似 boost::pool。如果你有看过 Andrei Alexandrescu 的老书 Modern C++ Design,这里的设计是很容易懂的。 默认是不回收内存给系统的(简单来说就是一直缓存最大内存水位,这样做有好处也有坏处),在一些复杂场景下可能会导致内存占用过高。
如果你有使用 tcmalloc 之类的高性能内存池,可以把 cpp-ipc/src/libipc/memory/resource.h 里的 async_pool_alloc 改成 static_alloc,这样会使用系统默认的内存分配器(tcmalloc 会自动接管默认分配器),不会只分配不回收了。
按照你的建意,改过之后,发现内存还在不断扩大,最后,订阅端,挂了。
另外还有一个问题就是,发布者是如何实现订阅端广播的?
咦。。你是怎么改的?是改成 static_alloc 了吗? 你的发送频率最高是多少?可以告诉我你的测试方法么?或者简单的 demo,我这边复现一下试试?
广播是通过在数据结构里记录客户端的标记(1个bit),取过的标记就清理掉,全部取完了数据才能覆盖。
resource.h line:27
template <typename T> using allocator = allocator_wrapper<T, async_pool_alloc>; --> using allocator = allocator_wrapper<T, static_alloc>;
频率为100ms, 数据大小为1,600,000
啊,不能只改这个,你只需要把async_pool_alloc改掉:
using async_pool_alloc = static_alloc;
template <typename T>
using allocator = allocator_wrapper<T, async_pool_alloc>;
明天我再试一下, 木老师,能不能把你实现多订阅者的过程,稍微说详细一点。谢谢您
关键在于 writer 需要能识别出当前队列中的消息是否已被所有 reader 消费完。 这里有两个问题:
- 如何得知有多少 reader?
- 如何标记某个 reader 已完成?
再考虑到 reader 可能会 crash,从而导致 reader 的计数不变,因此还有新的问题:
- 某消息一直未被消费完,如何做超时等待?
- 超时后,如何清除多余的未完成标记?
权衡性能、空间占用等指标后,不同的 prod-cons(生产者-消费者)模型会采用不同的方案。 具体来说,超时默认均为 100ms;在 unicast 模式下,消息本身无记录(因为一条消息只会被消费一次),只在整体上记录了 reader 的个数;在 broadcast 模式下,消息包含一个 32bits 的 flag,每个 reader 占用一个 unique bit,读取一次清理一个 bit,全空表示消费完毕(这也是为什么会有最多支持 32 个 receiver 的限制,实际上这个限制只存在于 smb 和 mmb 模式下)。
如何实现每次broadcast时,接收的客户保证不是已经拿到过消息的客户端呢?
对 broadcast 来说,每个客户端在自己的内存(而非共享内存)中保存自己的读取位置,写入者只需要保证不覆盖就好了。
一个 32bits 的 flag
能细说下 如何感知消费者上下线吗? 我windows下下了内存断点,counter 竟然竟然没断下来,没找到相关代码, 然后还有个问题 msg_que demo 测试发现 windows这边的FileMapping 一直create 但是没有 unmap 这个不会导致问题吗?
不知你找的是哪里的 counter?关键代码在:elem_array.h 的 connect_*/disconnect_* 。
实际上这个检测做得很简单,新的 receiver 会增加 connections 计数(见:elem_def.h,class conn_head_base、class conn_head),sender 在发送消息时会实时探测 connections 计数(见prod_cons.h,struct prod_cons_impl<wr<*, *, trans::broadcast>>系列的push方法)。而 receiver 退出会导致计数减少,若存在消息未收取完成,则将等待超时后强制覆盖。
CreateFileMapping可以用于创建,及打开一个现存对象(见:CreateFileMappingW function)。所有的句柄会在进程 正常 退出时自动释放,你可以在shm_win.cpp的release函数里打断点确认 msg-que 的 CTRL-C。
另外目前的大msg缓存算法其实对 msg-que 这种行为不太友好,它只是为了临时偶尔一两次大msg发送,而对频繁大小不确定的大msg传输,就会导致大量句柄的创建。我已经在一个新分支issue-17上提交了修改,避免句柄创建过多。你可以用这个新分支上的代码测试。