cpp-ipc icon indicating copy to clipboard operation
cpp-ipc copied to clipboard

关于以下代码的解读没有弄明白

Open liuxw7 opened this issue 4 years ago • 12 comments

你好!mutouyun 我在看代码的时候,以下两个代码没有明白,可否帮忙解析一下,谢谢 allloc.h中 static void** node_p(void* node) { return reinterpret_cast<void**>(node); }

static auto& next(void* node) { return *node_p(node); }

另外内存池中的内存随着数据频率发送的越高,其内存一直在增加,我的程序在引用的时候,每个数据大小概是16w左右/100ms,内存一直持续在增加,达到了1G之上,当暂亭发送的时候,内存池的内存也不见释放。请问这种情况如何处理? 谢谢!

liuxw7 avatar Mar 22 '21 08:03 liuxw7

能不能分享一下,你在做内存池时的设计原理。谢谢!

liuxw7 avatar Mar 22 '21 08:03 liuxw7

嗯。。我这个库里的内存池其实是比较简单的,主要是考虑到在不引入任何第三方内存池时的性能问题。 实现方式类似 boost::pool。如果你有看过 Andrei Alexandrescu 的老书 Modern C++ Design,这里的设计是很容易懂的。 默认是不回收内存给系统的(简单来说就是一直缓存最大内存水位,这样做有好处也有坏处),在一些复杂场景下可能会导致内存占用过高。

如果你有使用 tcmalloc 之类的高性能内存池,可以把 cpp-ipc/src/libipc/memory/resource.h 里的 async_pool_alloc 改成 static_alloc,这样会使用系统默认的内存分配器(tcmalloc 会自动接管默认分配器),不会只分配不回收了。

mutouyun avatar Mar 23 '21 02:03 mutouyun

按照你的建意,改过之后,发现内存还在不断扩大,最后,订阅端,挂了。

另外还有一个问题就是,发布者是如何实现订阅端广播的?

liuxw7 avatar Mar 25 '21 06:03 liuxw7

咦。。你是怎么改的?是改成 static_alloc 了吗? 你的发送频率最高是多少?可以告诉我你的测试方法么?或者简单的 demo,我这边复现一下试试?

广播是通过在数据结构里记录客户端的标记(1个bit),取过的标记就清理掉,全部取完了数据才能覆盖。

mutouyun avatar Mar 25 '21 07:03 mutouyun

resource.h line:27

template <typename T> using allocator = allocator_wrapper<T, async_pool_alloc>; --> using allocator = allocator_wrapper<T, static_alloc>;

频率为100ms, 数据大小为1,600,000

liuxw7 avatar Mar 25 '21 08:03 liuxw7

啊,不能只改这个,你只需要把async_pool_alloc改掉:

using async_pool_alloc = static_alloc;

template <typename T>
using allocator = allocator_wrapper<T, async_pool_alloc>;

mutouyun avatar Mar 25 '21 13:03 mutouyun

明天我再试一下, 木老师,能不能把你实现多订阅者的过程,稍微说详细一点。谢谢您

liuxw7 avatar Mar 25 '21 14:03 liuxw7

关键在于 writer 需要能识别出当前队列中的消息是否已被所有 reader 消费完。 这里有两个问题:

  1. 如何得知有多少 reader?
  2. 如何标记某个 reader 已完成?

再考虑到 reader 可能会 crash,从而导致 reader 的计数不变,因此还有新的问题:

  1. 某消息一直未被消费完,如何做超时等待?
  2. 超时后,如何清除多余的未完成标记?

权衡性能、空间占用等指标后,不同的 prod-cons(生产者-消费者)模型会采用不同的方案。 具体来说,超时默认均为 100ms;在 unicast 模式下,消息本身无记录(因为一条消息只会被消费一次),只在整体上记录了 reader 的个数;在 broadcast 模式下,消息包含一个 32bits 的 flag,每个 reader 占用一个 unique bit,读取一次清理一个 bit,全空表示消费完毕(这也是为什么会有最多支持 32 个 receiver 的限制,实际上这个限制只存在于 smb 和 mmb 模式下)。

mutouyun avatar Mar 26 '21 03:03 mutouyun

如何实现每次broadcast时,接收的客户保证不是已经拿到过消息的客户端呢?

liuxw7 avatar Mar 26 '21 11:03 liuxw7

对 broadcast 来说,每个客户端在自己的内存(而非共享内存)中保存自己的读取位置,写入者只需要保证不覆盖就好了。

mutouyun avatar Mar 27 '21 08:03 mutouyun

一个 32bits 的 flag

能细说下 如何感知消费者上下线吗? 我windows下下了内存断点,counter 竟然竟然没断下来,没找到相关代码, 然后还有个问题 msg_que demo 测试发现 windows这边的FileMapping 一直create 但是没有 unmap 这个不会导致问题吗?

aijyo avatar May 06 '21 15:05 aijyo

不知你找的是哪里的 counter?关键代码在:elem_array.hconnect_*/disconnect_*

实际上这个检测做得很简单,新的 receiver 会增加 connections 计数(见:elem_def.hclass conn_head_baseclass conn_head),sender 在发送消息时会实时探测 connections 计数(见prod_cons.hstruct prod_cons_impl<wr<*, *, trans::broadcast>>系列的push方法)。而 receiver 退出会导致计数减少,若存在消息未收取完成,则将等待超时后强制覆盖。

CreateFileMapping可以用于创建,及打开一个现存对象(见:CreateFileMappingW function)。所有的句柄会在进程 正常 退出时自动释放,你可以在shm_win.cpprelease函数里打断点确认 msg-que 的 CTRL-C。

另外目前的大msg缓存算法其实对 msg-que 这种行为不太友好,它只是为了临时偶尔一两次大msg发送,而对频繁大小不确定的大msg传输,就会导致大量句柄的创建。我已经在一个新分支issue-17上提交了修改,避免句柄创建过多。你可以用这个新分支上的代码测试。

mutouyun avatar May 07 '21 09:05 mutouyun