mqttx
mqttx copied to clipboard
MQTTX Project 完整实现 mqttv3.1.1 协议,旨在提供易于使用且性能优异的 mqtt broker
MQTTX Project
中文 | English
- 1 介绍
- 1.1 快速开始
- 1.2 项目依赖
- ~~1.3 线上实例~~
- 2 架构
- 2.1 目录结构
- 3 docker 启动
- 4 功能说明
- 4.1 qos 支持
- 4.2 topicFilter 支持
- 4.3 集群支持
- 4.4 ssl 支持
- 4.5 topic 安全支持
- 4.6 共享主题支持
- 4.7 websocket 支持
- 4.8 系统主题
- 4.8.1 状态主题
- 4.8.2 功能主题
- 4.9 消息桥接支持
- 4.10 主题限流支持
- 4.11 消息持久化支持
- 4.12 基础认证支持
- 5 开发者说
- 6 附表
- 6.1 配置项
- 6.2 版本说明
- 6.2.1 v1.0
- 6.2.2 v1.1
- 6.2.3 v2.0
- 6.2.4 v1.2
- 6.3 Benchmark
- 6.3.1 CleanSessionTrue
- 6.3.2 CleanSessionFalse
- 6.4 代码质量分析
1 介绍
Mqttx 基于 MQTT v3.1.1 协议开发,旨在提供 易于使用 且 性能优越 的 mqtt broker。
注意:分支 v1.2 要求 JDK17, 其它分支要求 JDK8
关联项目: Mqttx-Client 实现&使用极为简单的 mqttv3.1.1 客户端.
1.1 快速开始
想通过 docker 快速体验?见 docker 启动
-
打包
- 开发模式:
- 启动
redis实例 - 运行
mvnw -P dev -DskipTests=true clean package
- 启动
- 开发模式:
-
运行
- 运行命令:
java -jar mqttx-1.0.5.BETA.jar
- 运行命令:
快速开始-测试模式 图例:
-
测试模式
- 集群功能被强制关闭
- 消息保存在内存而不是
redis
-
开发模式
- 消息会持久化到
redis, 默认连接localhost:6376无密码
- 消息会持久化到
所谓开发模式只是方便同学们快速启动项目,方便测试功能测试。熟悉项目后,同学们可通过修改 6.1 配置项 开启或关闭 mqttx 提供的各项功能。
mqttx默认依赖redis实现消息持久化、集群等功能,使用其它中间件(mysql,mongodb,kafka等)同样能够实现,而springboot具备spring-boot-starter-***等各种可插拔组件,方便大家修改默认的实现
1.2 项目依赖
- [x] Redis: 集群消息、消息持久化
- [x] Kafka:桥接消息支持,集群消息(可选功能)
其它说明:
- 项目使用了 lombok,使用 ide 请安装对应的插件
开发工具建议使用 Intellij IDEA :blush:
举例:
idea需要安装插件Lombok,settings > Build,Execution,Deployment > Compiler > Annotation Processor开启Enable annotation processing
~~1.3 线上实例~~
云服务到期,实例已经无法访问,有朋友赞助吗/(ㄒoㄒ)/~~
云端部署了一个
mqttx单例服务,可供功能测试:
- 不支持
ssl- 开启了
websocket, 可通过 http://ws.tool.tusk.link/ 测试,仅需将域名修改为:119.45.158.51(端口、地址不变)- 支持共享订阅功能
- 部署版本
v1.0.6.RELEASE
2 架构
mqttx支持客户端认证、topic 发布/订阅鉴权功能,如果需要配套使用,建议的架构如下图:

客户认证服务由使用者自行实现
内部实现框架关系(仅列出关键项):

2.1 目录结构
├─java
│ └─com
│ └─jun
│ └─mqttx
│ ├─broker # mqtt 协议实现及处理包
│ │ ├─codec # 编解码
│ │ └─handler # 消息处理器(pub, sub, connn, etc)
│ ├─config # 配置,主要是 bean 声明
│ ├─constants # 常量
│ ├─consumer # 集群消息消费者
│ ├─entity # 实体类
│ ├─exception # 异常类
│ ├─service # 业务服务(用户认证, 消息存储等)接口
│ │ └─impl # 默认实现
│ └─utils # 工具类
└─resources # 资源文件(application.yml 在此文件夹)
├─META-INF # spring-configuration 辅助配置说明
└─tls # ca 存放地址
3 docker 启动
镜像已上传至 docker-hub , 访问:fantasywujun/mqttx - Docker Hub 全部镜像
docker 环境安装好后,执行 docker-compose -f ./docker-compose.yml up 启动, 效果见下图:

| Docker Pull Command | 说明 |
|---|---|
docker pull fantasywujun/mqttx:1.2.0 |
基于 jdk17.0.1 的 mqttx:1.2.0 版本 |
docker-compose 文件内容:
version: "2"
services:
redis:
container_name: redis-for-mqttx
image: redis
mqttx:
container_name: mqttx
image: fantasywujun/mqttx:1.2.0
ports:
- 1883:1883
- 8083:8083
4 功能说明
4.1 qos 支持
| qos0 | qos1 | qos2 |
|---|---|---|
| 支持 | 支持 | 支持 |
为支持 qos1、qos2,引入 redis 作为持久层,这部分已经封装成接口,可自行替换实现(比如采用 mysql)。
4.2 topicFilter 支持
- 支持多级通配符
#与单级通配符+ - 不支持以
/结尾的topic,比如a/b/,请改为a/b。 - 其它规则见 mqtt v3.1.1 4.7 Topic Names and Topic Filters
mqttx 仅对订阅 topicFilter 进行校验,publish 的 topic 是没有做合法性检查的,可通过开启 4.5 topic 安全支持 限制客户端可发布的 topic。
举例:
| topicFilter | match topics |
|---|---|
/a/b/+ |
/a/b/abc,/a/b/test |
a/b/# |
a/b, a/b/abc, a/b/c/def |
a/+/b/# |
a/nani/b/abc |
/+/a/b/+/c |
/aaa/a/b/test/c |
校验工具类为:com.jun.mqttx.utils.TopicUtils
4.3 集群支持
mqttx 依赖消息中间件分发消息实现集群功能,目前支持的中间件:
- [x]
Kafka:可选配置 - [x]
Redis:默认配置
实现原理如下图:

mqttx.cluster.enable:功能开关,默认falsemqttx.cluster.type: 消息中间件类型,默认redis
注意事项:
-
v1.0.5.RELEASE之前的版本集群功能存在 bug,无法使用。 -
如需使用
kafka实现集群消息,需要手动修改配置application-*.yml, 可参考application-dev.yml中的配置示例 3. kafka 集群。
4.4 ssl 支持
开启 ssl 你首先应该有了 ca(自签名或购买),然后修改 application.yml 文件中几个配置:
mqttx.ssl.enable:功能开关,默认false,同时控制websocket与socketmqttx.ssl.key-store-location:keystore 地址,基于classpathmqttx.ssl.key-store-password:keystore 密码mqttx.ssl.key-store-type:keystore 类别,如PKCS12mqttx.ssl.client-auth:服务端是否需要校验客户端证书,默认NONE
resources/tls目录中的mqttx.keystore仅供测试使用, 密码:123456证书加载工具类:
com/jun/mqttx/utils/SslUtils.java
4.5 topic 安全支持
为了对 client 订阅 topic 进行限制,加入topic 订阅&发布鉴权机制:
-
mqttx.enable-topic-sub-pub-secure: 功能开关,默认false -
broker 收到 conn 报文后,会抓取
{clientId, username, password}发起请求给mqttx.auth.url, 该接口返回对象中含有authorizedSub,authorizedPub存储 client 被授权订阅及发布的topic列表。详见 4.12 基础认证支持
-
broker 在消息订阅及发布都会校验客户端权限
支持的主题类型:
- [x] 普通主题
- [x] 共享主题
- [x] 系统主题
4.6 共享主题支持
共享订阅是 mqtt5 协议规定的内容,很多 mq(例如 kafka) 都有实现。
-
mqttx.share-topic.enable: 功能开关,默认true -
格式:
$share/{ShareName}/{filter},$share为前缀,ShareName为共享订阅名,filter就是非共享订阅主题过滤器。 -
目前支持
hash,random,round三种规则hash选出的 client 会随着订阅客户端数量及发送消息客户端clientId变化而变化
下图展示了共享主题与常规主题之间的差异:

msg-a 消息分发策略取决于配置项 mqttx.share-topic.share-sub-strategy
可以配合 cleanSession = 1 的会话,共享主题的客户端断开连接后会被服务端移除订阅,这样共享主题的消息只会分发给在线的客户端。
CleanSession 介绍:mqtt3.1.1 协议规定当 cleanSession = 1 时,连接断开后与会话相关联的所有状态(不含 retained 消息)都会被删除(mqtt5
增加了会话超时设置,感兴趣的同学可以了解一下)。
mqttx v1.0.5.BETA 版本后(含),cleanSession = 1 的会话消息保存在内存中,具备极高的性能.
If CleanSession is set to 1, the Client and Server MUST discard any previous Session and start a new one. This Session lasts as long as the Network Connection. State data associated with this Session MUST NOT be reused in any subsequent Session [MQTT-3.1.2-6].
The Session state in the Client consists of:
- QoS 1 and QoS 2 messages which have been sent to the Server, but have not been completely acknowledged.
- QoS 2 messages which have been received from the Server, but have not been completely acknowledged.
The Session state in the Server consists of:
- The existence of a Session, even if the rest of the Session state is empty.
- The Client’s subscriptions.
- QoS 1 and QoS 2 messages which have been sent to the Client, but have not been completely acknowledged.
- QoS 1 and QoS 2 messages pending transmission to the Client.
- QoS 2 messages which have been received from the Client, but have not been completely acknowledged.
- Optionally, QoS 0 messages pending transmission to the Client.
4.7 websocket 支持
支持
4.8 系统主题
mqttx broker 内置部分系统主题,用户可酌情使用。
系统主题不支持如下特性:
- 集群:系统主题不支持集群,包括消息及订阅
- 持久化:系统主题消息不支持持久化,包括订阅关系
- QoS: 不支持 QoS 1,2 仅支持 QoS 0
注意:topic 安全机制 同样会影响客户端订阅系统主题, 未授权客户端将无法订阅系统主题
系统主题可分两种:
- 状态主题:反应 broker 自身状态的主题
- 功能主题:对外提供功能性支持的主题
4.8.1 状态主题
客户端可通过订阅系统主题获取 broker 状态,目前系统支持如下状态主题:
| 主题 | 描述 |
|---|---|
$SYS/broker/{brokerId}/status |
触发方式:订阅此主题的客户端会定期(mqttx.sys-topic.interval)收到 broker 的状态,该状态涵盖下面所有主题的状态值. 注意:客户端连接断开后,订阅取消 |
$SYS/broker/activeConnectCount |
立即返回当前的活动连接数量 触发:订阅一次触发一次 |
$SYS/broker/time |
立即返回当前时间戳 触发:订阅一次触发一次 |
$SYS/broker/version |
立即返回 broker 版本触发:订阅一次触发一次 |
$SYS/broker/receivedMsg |
立即返回 broker 启动到现在收到的 MqttMessage, 不含 ping触发:订阅一次触发一次 |
$SYS/broker/sendMsg |
立即返回 broker 启动到现在发送的 MqttMessage, 不含 pingAck触发:订阅一次触发一次 |
$SYS/broker/uptime |
立即返回 broker 运行时长,单位秒触发:订阅一次触发一次 |
$SYS/broker/maxActiveConnectCount |
立即返回 broker 运行至今的最大 tcp 连接数触发:订阅一次触发一次 |
系统主题 $SYS/broker/{brokerId}/status 中的 brokerId 为配置项参数(见 6.1 配置项),可通过携带通配符的主题 $SYS/broker/+/status 订阅。
响应对象格式为 json 字符串:
{
"activeConnectCount": 1,
"maxActiveConnectCount": 2,
"receivedMsg": 6,
"sendMsg": 77,
"timestamp": "2021-03-23T23:05:37.035",
"uptime": 149,
"version": "1.0.7.RELEASE"
}
| field | 说明 |
|---|---|
activeConnectCount |
当前活跃连接数量 |
maxActiveConnectCount |
最大活跃连接数量 |
receiveMsg |
收到消息数量,不含 ping |
sendMsg |
发送消息数量,不含 pingAck |
timestamp |
时间戳;(yyyy-MM-dd HH:mm:ss) |
uptime |
broker 上线时长,单位秒 |
version |
mqttx 版本 |
4.8.2 功能主题
此功能需求源自 issue: 监听MQTT客户端状态(在线、离线) · Issue #8 · Amazingwujun/mqttx (github.com)
| 主题 | 描述 |
|---|---|
$SYS/broker/{borkerId}/clients/{clientId}/connected |
客户端上线通知主题 触发:当某个客户端上线后,broker 会发送消息给该主题 |
$SYS/broker/{borkerId}/clients/{clientId}/disconnected |
客户端下线通知主题 触发:当某个客户端掉线后,broker 会发送消息给该主题 |
这两个系统主题支持通配符,举例:
$SYS/broker/+/clients/#: 匹配客户端上下线通知主题$SYS/broker/+/clients/+/connected: 匹配客户端上线通知主题$SYS/broker/+/clients/+/disconnected: 匹配客户端下线通知主题
4.9 消息桥接支持
支持消息中间件:
- [x] kafka
消息桥接功能可方便的对接消息队列中间。
mqttx.message-bridge.enable:开启消息桥接功能mqttx.bridge-topics:需要桥接消息的主题,主题必须符合 kafka 对 topic 的要求
mqttx 收到客户端 发布 的消息后,先判断桥接功能是否开启,然后再判断主题是否是需要桥接消息的主题,最后发布消息到 MQ。
仅支持单向桥接:device(client) => mqttx => MQ
4.10 主题限流支持
使用基于令牌桶算法的 com.jun.mqttx.utils.RateLimiter 对指定主题进行流量限制。
令牌桶算法参见:https://stripe.com/blog/rate-limiters
简单解释一下令牌桶概念:有一个最大容量为
capacity的令牌桶,该桶以一定的速率补充令牌(replenish-rate),每次调用接口时消耗一定量(token-consumed-per-acquire)的令牌,令牌数目足够则请求通过。
主题限流仅适用于 qos 等于 0 的消息。
配置举例:
mqttx:
rate-limiter:
enable: true
topic-rate-limits:
# 例一
- topic: "/test/a"
capacity: 9
replenish-rate: 4
token-consumed-per-acquire: 3
# 例二
- topic: "/test/b"
capacity: 5
replenish-rate: 5
token-consumed-per-acquire: 2
capacity: 桶容量replenish-rate: 令牌填充速率token-consumed-per-acquire: 每次请求消耗令牌数量
QPS 计算公式:
- 最大并发数:公式为
QPS = capacity ÷ token-consumed-per-acquire- 示例一:
9 ÷ 3 = 3 - 示例二:
5 ÷ 2 = 2.5
- 示例一:
- 最大持续并发数:公式
QPS = replenish-rate ÷ token-consumed-per-acquire- 示例一:
4 ÷ 3 ≈ 1.3 - 示例二:
5 ÷ 2 = 2.5
- 示例一:
4.11 消息持久化支持
mqttx 的持久化依赖 redis , mqttx 会持久化 cleanSession = false & qos > 0 的消息, 消息被 Serializer 序列化为字节数组后存储在 redis。
目前 mqttx 提供了两种序列化实现:
JsonSerializerKryoSerializer
默认使用 JsonSerializer, 这是为了和之前的项目兼容;v1.0.6.release 版本后 KryoSerializer 将成为默认序列化实现。
可通过配置 mqttx.serialize-strategy 修改序列化实现。
4.12 基础认证支持
mqttx 提供基础客户端认证服务。
配置项:
mqttx.auth.url: 提供认证服务的接口地址。mqttx.auth.timeout:HttpClient请求超时
用户在配置文件中声明 mqtt.auth.url 后,对象 com.jun.mqttx.service.impl.DefaultAuthenticationServiceImpl 使用 HttpClient 发出 POST 请求给 mqttx.auth.url。
请求内容为 mqtt conn 报文中的 username, password.
POST / HTTP/1.1
Host: mqttx.auth.url
Content-Type: application/json
Content-Length: 91
{
"clientId": "device_id_test",
"username": "mqttx",
"password": "123456"
}
认证成功后响应对象为 json 格式字符串:
{
"authorizedSub": [
"subTopic1",
"subTopic2"
],
"authorizedPub": [
"pubTopic1",
"pubTopic2"
]
}
认证成功返回响应可配合 4.5 topic 安全支持 使用。
注意:
- 接口返回
http status = 200即表明认证成功, 其它状态值一律为认证失败
5 开发者说
-
感谢 Jetbrains 为开源项目提供的 License
-
长期更新维护的分支
v1.0: 基于jdk8且 redis io 为 blocking 模式.v1.2: 基于jdk17且 redis io 为 non-blocking 模式.
-
为使 mqttx 项目变得更好,请使用及学习该项目的同学主动反馈使用情况给我(提 issue 或加群反馈)
-
后续工作
- [ ]
v1.0.8.RELEASE版本开发 - [ ]
v1.1.0.RELEASE版本开发 - [x]
v1.2版本开发 - [ ]
v2.0版本开发 - [x] bug 修复
- [ ]
-
v1.2版本由 JDK8 升级至 JDK17 -
v2.0版本分支将作为 mqttv5 协议版本开始迭代 -
这段时间工作任务繁重,功能迭代暂时停止,当然 bug 我还是会优先处理🙂
-
请作者喝杯 丝绒拿铁 😊
-
交流群
6 附表
6.1 配置项
src/main/resources 目录下有三个配置文件:
application.ymlapplication-dev.ymlapplication-prod.yml
后两个配置文件目的是区分不同环境下的配置,便于管理。
配置项说明:
| 配置 | 默认值 | 说明 |
|---|---|---|
mqttx.version |
取自 pom.xml |
版本 |
mqttx.broker-id |
取自 pom.xml |
应用标志, 唯一 |
mqttx.heartbeat |
60s |
初始心跳,会被 conn 消息中的 keepalive 重置 |
mqttx.host |
0.0.0.0 |
监听地址 |
mqttx.so-backlog |
512 |
tcp 连接处理队列 |
mqttx.enable-topic-sub-pub-secure |
false |
客户订阅/发布主题安全功能,开启后将限制客户端发布/订阅的主题 |
mqttx.enable-inner-cache |
true |
发布消息每次都需要查询 redis 来获取订阅的客户端列表。开启此功能后,将在内存中建立一个主题-客户端关系映射, 应用直接访问内存中的数据即可 |
mqttx.ignore-client-self-pub |
true |
忽略 client 发送给自己的消息(当 client 发送消息给自己订阅的主题) |
mqttx.serialize-strategy |
json |
broker 采用的序列化策略,集群策略必须一致。 |
mqttx.redis.cluster-session-hash-key |
mqttx.session.key |
redis map key;用于集群的会话存储 |
mqttx.redis.topic-prefix |
mqttx:topic: |
主题前缀; topic <==> client 映射关系保存 |
mqttx.redis.retain-message-prefix |
mqttx:retain: |
保留消息前缀, 保存 retain 消息 |
mqttx.redis.pub-msg-set-prefix |
mqttx:client:pubmsg: |
client pub消息 redis set 前缀; 保存 pubmsg,当收到 puback 获取 pubrec 后删除 |
mqttx.redis.pub-rel-msg-set-prefix |
mqttx:client:pubrelmsg: |
client pubRel 消息 redis set 前缀;保存 pubrel 消息 flag,收到 pubcom 消息删除 |
mqttx.redis.topic-set-key |
mqttx:alltopic |
topic 集合,redis set key 值;保存全部主题 |
mqttx.redis.message-id-prefix |
mqttx:messageId: |
非 cleanSession client 的 messageId, 使用 redis INCR 指令 |
mqttx.redis.client-topic-set-prefix |
mqttx:client:topicset: |
client 订阅的主题 redis set 前缀; 保存 client 订阅的全部主题 |
mqttx.cluster.enable |
false |
集群开关 |
mqttx.cluster.inner-cache-consistancy-key |
mqttx:cache_consistence |
应用启动后,先查询 redis 中无此 key 值,然后在检查一致性 |
mqttx.cluster.type |
redis |
集群消息中间件类型 |
mqttx.ssl.enable |
false |
ssl 开关 |
mqttx.ssl.client-auth |
NONE |
客户端证书校验 |
mqttx.ssl.key-store-location |
classpath: tls/mqttx.keystore |
keyStore 位置 |
mqttx.ssl.key-store-password |
123456 |
keyStore 密码 |
mqttx.ssl.key-store-type |
pkcs12 |
keyStore 类别 |
mqttx.socket.enable |
true |
socket 开关 |
mqttx.socket.port |
1883 |
socket 监听端口 |
mqttx.websocket.enable |
false |
websocket 开关 |
mqttx.websocket.port |
8083 |
websocket 监听端口 |
mqttx.websocket.path |
/mqtt |
websocket path |
mqttx.share-topic.enable |
true |
共享主题功能开关 |
mqttx.share-topic.share-sub-strategy |
round |
负载均衡策略, 目前支持随机、轮询、哈希 |
mqttx.sys-topic.enable |
false |
系统主题功能开关 |
mqttx.sys-topic.interval |
60s |
定时发布间隔 |
mqttx.message-bridge.enable |
false |
消息桥接功能开关 |
mqttx.message-bridge.topics |
null |
需要桥接消息的主题列表 |
mqttx.rate-limiter.enable |
false |
主题限流开关 |
mqttx.rate-limiter.token-rate-limit |
参见 主题限流支持 配置举例说明 | |
mqttx.auth.url |
null |
mqtt conn username/password 认证服务接口地址 |
mqttx.auth.timeout |
3s |
readTimeout |
6.2 版本说明
prometheus 分支为 MQTTX 整合监控系统 Prometheus 的代码,有需要的用户可参考该分支代码.
6.2.1 v1.0
- v1.0.8.RELEASE
- [ ] 消息集中持久化到
redis hmap数据结构中,PubMsg仅保存hmap中的payloadId, 该优化目的在于防止消息膨胀导致的 redis 内存耗用过大。(之前版本消息都是持久化到客户端各自的PubMsg)
- [ ] 消息集中持久化到
- v1.0.7.RELEASE
- [x] 增加序列化框架 Kryo 的支持
- [x] 系统主题新增客户端上下线通知主题
- [x] 修复新增订阅触发
retain消息后,消息分发给全部订阅者的 bug - [x] 修复遗嘱消息
isWillRetain:true持久化的bug - [x] bug 修复及优化
- v1.0.6.RELEASE
- [x]
netty 4.1.52.Final这个版本的 MqttEncoder.java 处理 UnsubAck 响应消息会导致 NPE,直接影响功能,不得不提前结束此版本的开发 - [x] bug 修复
- [x]
- v1.0.5.RELEASE
- [x] 测试模式支持
- [x]
epoll支持,见 https://netty.io/wiki/native-transports.html - [x] 优化
cleanSession消息处理机制 - [x] 消息桥接
- [x] bug 修复及优化
- v1.0.4.RELEASE
- [x] websocket 支持
- [x] 集群状态自检
- [x] bug 修复及优化
- v1.0.3.RELEASE
- [x] bug 修复
- v1.0.2.RELEASE
- [x] 共享主题加入轮询策略
- [x] bug 修复及优化
- v1.0.1.RELEASE
- [x] 基于
redis的集群功能支持 - [x] 共享主题支持
- [x] 主题权限功能
- [x] bug 修复及优化
- [x] 基于
- v1.0.0.RELEASE
- [x]
mqttv3.1.1完整协议实现
- [x]
6.2.2 v1.1
- v1.1.0.RELEASE(开发中)
- [ ]
redis同步转异步实现,提升性能
- [ ]
6.2.3 v2.0
- v2.0.0.RELEASE
- [ ] mqtt5 支持
6.2.4 v1.2
-
v1.2.0.RELEASE
- [x] 项目依赖 JDK 升级,当前版本:JDK8 目标版本:JDK17
-
v1.2.1.ALPHA
- [X] reactor 改造。
6.3 Benchmark
版本过低,此 Benchmark 已不可靠
测试条件简陋,结果仅供参考。
版本: MQTTX v1.0.5.BETA
工具: mqtt-bench
机器:
| 系统 | cpu | 内存 |
|---|---|---|
win10 |
i5-4460 |
16G |
6.3.1 CleanSessionTrue
- 启用
redis cleanSession: true
实际上
pub消息存储并未走 redis, 原因见 共享主题 中关于cleanSession的介绍
执行 java -jar -Xmx1g -Xms1g mqttx-1.0.5.BETA.jar
- qos0
C:\Users\Jun\go\windows_amd64>mqtt-bench.exe -broker=tcp://localhost:1883 -action=pub -clients=1000 -qos=0 -count=1000
2020-09-30 15:33:54.462089 +0800 CST Start benchmark
2020-09-30 15:34:33.6010217 +0800 CST End benchmark
Result : broker=tcp://localhost:1883, clients=1000, totalCount=1000000, duration=39134ms, throughput=25553.23messages/sec
- qos1
C:\Users\Jun\go\windows_amd64>mqtt-bench.exe -broker=tcp://localhost:1883 -action=pub -clients=1000 -qos=1 -count=1000
2020-09-30 15:29:17.9027515 +0800 CST Start benchmark
2020-09-30 15:30:25.0316915 +0800 CST End benchmark
Result : broker=tcp://localhost:1883, clients=1000, totalCount=1000000, duration=67124ms, throughput=14897.80messages/sec
- qos2
C:\Users\Jun\go\windows_amd64>mqtt-bench.exe -broker=tcp://localhost:1883 -action=pub -clients=1000 -qos=2 -count=1000
2020-09-30 15:37:00.0678207 +0800 CST Start benchmark
2020-09-30 15:38:55.4419847 +0800 CST End benchmark
Result : broker=tcp://localhost:1883, clients=1000, totalCount=1000000, duration=115369ms, throughput=8667.84messages/sec
| 并发连接数量 | 行为 | 单个消息大小 | 单连接消息数量 | 报文总数 | qos | 耗时 | qps |
|---|---|---|---|---|---|---|---|
1000 |
发布消息 | 1024byte |
1000 |
一百万 | 0 |
39.1s |
25553 |
1000 |
发布消息 | 1024byte |
1000 |
一百万 | 1 |
67.1s |
14897 |
1000 |
发布消息 | 1024byte |
1000 |
一百万 | 2 |
115.3s |
8667 |
资源消耗:cpu: 25%, mem 440 MB
6.3.2 CleanSessionFalse
- 启用
redis cleanSession: false
执行 java -jar -Xmx1g -Xms1g mqttx-1.0.5.BETA.jar
- qos0
C:\Users\Jun\go\windows_amd64>mqtt-bench.exe -broker=tcp://localhost:1883 -action=pub -clients=1000 -qos=0 -count=1000
2020-09-30 17:03:55.7560928 +0800 CST Start benchmark
2020-09-30 17:04:36.2080909 +0800 CST End benchmark
Result : broker=tcp://localhost:1883, clients=1000, totalCount=1000000, duration=40447ms, throughput=24723.71messages/sec
- qos1
C:\Users\Jun\go\windows_amd64>mqtt-bench.exe -broker=tcp://localhost:1883 -action=pub -clients=1000 -qos=1 -count=1000
2020-09-30 17:06:18.9136484 +0800 CST Start benchmark
2020-09-30 17:08:20.9072865 +0800 CST End benchmark
Result : broker=tcp://localhost:1883, clients=1000, totalCount=1000000, duration=121992ms, throughput=8197.26messages/sec
- qos2
C:\Users\Jun\go\windows_amd64>mqtt-bench.exe -broker=tcp://localhost:1883 -action=pub -clients=1000 -qos=2 -count=1000
2020-09-30 17:09:35.1314262 +0800 CST Start benchmark
2020-09-30 17:13:10.7914125 +0800 CST End benchmark
Result : broker=tcp://localhost:1883, clients=1000, totalCount=1000000, duration=215656ms, throughput=4637.01messages/sec
| 并发连接数量 | 行为 | 单个消息大小 | 单连接消息数量 | 报文总数 | qos | 耗时 | qps |
|---|---|---|---|---|---|---|---|
1000 |
发布消息 | 1024byte |
1000 |
一百万 | 0 |
40.4s |
24723 |
1000 |
发布消息 | 1024byte |
1000 |
一百万 | 1 |
121.9s |
8197 |
1000 |
发布消息 | 1024byte |
1000 |
一百万 | 2 |
215.6s |
4637 |
资源消耗:cpu: 45%, mem 440 MB
6.4 代码质量分析
结果取自 mqttx: (gitee.com) sonarQube
- 漏洞是我将
keyStore密码硬编码写到了配置代码,方便用户测试TLS,用户可自行替换。

