datakit icon indicating copy to clipboard operation
datakit copied to clipboard

kafka handle

Open songlonqi-java opened this issue 2 years ago • 1 comments

kafkamq 提供一种插件机制:将数据([]byte)通过 HTTP 发送到外部handle,经过处理后再通过response返回行协议的json格式数据。实现定制化数据。

增加如下配置:(以最终配置为准)

  • http url string
  • message_points int 一次发送的消息点数
  • debug bool 值, 当开启debug功能, message_points 则无效,如果开启debug模式,则将原始byte数据发送,不再进行消息合并。
  • threads int 多线程工作
  • is_response_point 是否将行协议数据发送回来
  • pipeline 脚本
  • header_check 特殊的头部检测(bfy定制化,并非通用)
  • 等等

外部插件有一些约束:

  • kafkamq 接收数据但不负责解析
  • 外部插件解析后的数据可以通过 dk api 发送,也可以返回到 kafkamq 再发送到观测云
  • 通过 response 返回到 kafkamq 必须是 行协议格式。
  • 外部插件收到数据,无论解析失败与否 都应该返回200。
  • kafkamq发送数据到外部插件如果出现timeout,端口不存在等。会尝试重连。不再消费kafka中的消息。

songlonqi-java avatar Jul 28 '23 08:07 songlonqi-java