deepflow
deepflow copied to clipboard
feat(agent): support multiple flow log senders
This PR is for:
- [x] Agent
Feature:
- 支持多线程并发发送日志,提升高吞吐量场景稳定性
- 引入了基于 crossbeam-channel 的 全新 MPMC(多生产者多消费者)队列实现,用于替代或补充现有的 ring-buffer 实现。
背景问题
在专属采集器或高速数据面(如 2 Mpps、L4 Flow 限速 4 万条/s)下,现有链路中仅包含单个发送队列和单个 UniformSenderThread,很容易出现写入过快、队列被写满、日志覆盖丢弃的问题,最终影响日志完整性与可观测性。
本次改动实现
✅ 主要改动
在 agent/crates/public/src/queue/ 下新增:mpmc_queue.rs:实现基于 crossbeam_channel 的 Sender、Receiver 和 StatsHandle,支持 MPMC 模型;
修改 queue/mod.rs: 引入并导出 mpmc_queue,增加 bounded_mpmc() 构造函数
✅ 新增功能:
-
引入可配置的 日志发送并发度参数,包括:
l4_flow_sendersl7_flow_sendersmetric_senderspcap_senders等
-
每类日志类型按配置值创建多个:
- 有界
DebugSender队列(queue::bounded_with_debug) - 对应的
UniformSenderThread实例
- 有界
-
所有 sender 命名后缀追加编号(如
3-flowlog-to-collector-sender-0~-3)
✅ 日志发送改为并发分发:
-
负载均衡策略:目前支持:
- Round-Robin(默认)
- 哈希分发(基于五元组)可扩展支持
-
发送线程存入
Vec<UniformSenderThread<_>>,在Trident::start()中统一启动,优雅退出时统一停止
✅ 配置示例(YAML):
log:
l4_flow_senders: 4
l7_flow_senders: 2
metric_senders: 2
## ChatGPT/CodeX 设计思路
1. 配置并发度
为每种日志类型新增一个配置项,如 l4_flow_senders、metrics_senders 等,用于指定要创建的 sender 数量。
2. 创建多组队列及线程
根据上述配置,循环调用 queue::bounded_with_debug 和 UniformSenderThread::new 创建若干队列及对应的 UniformSenderThread。可以在队列名称后追加索引区分,例如 "3-flowlog-to-collector-sender-1"、"3-flowlog-to-collector-sender-2" 等。所有生成的线程存入 Vec<UniformSenderThread<_>> 统一管理。
3. 在聚合/生成阶段分发数据
原先的发送链路(如 FlowAggrThread)只向一个 DebugSender 写数据。扩展后,可在 CollectorThread 或 FlowAggrThread 中实现简单的负载均衡策略:
- 轮询:对每个输出条目按顺序选择不同的 DebugSender。
- 哈希:根据流的五元组或其他关键字段计算哈希,选择固定的 sender,从而保持同一流的数据顺序。
分发后即可并行写入多个发送队列。
4. 启动与管理多个线程
在 Trident::start() 中遍历 Vec<UniformSenderThread<_>>,逐一调用 start() 启动。停止时同样遍历 notify_stop 或 stop()。
统计信息和异常处理可沿用现有逻辑,只需将每个线程的计数器注册到 stats_collector。
5. 多消费者/MPMC支持
多消费者(MPMC, Multi-Producer Multi-Consumer)支持是对当前队列模型(MPSC: 多生产者单消费者)的架构级增强,其核心目标是提升消费者处理吞吐能力,降低单消费者瓶颈对系统稳定性的影响