[Bug] Controller方式部署的集群,rust客户端投递消息报错
Before Creating the Bug Report
-
[X] I found a bug, not just asking a question, which should be created in GitHub Discussions.
-
[X] I have searched the GitHub Issues and GitHub Discussions of this repository and believe that this is not a duplicate.
-
[X] I have confirmed that this bug belongs to the current repository, not other repositories of RocketMQ.
Programming Language of the Client
Rust
Runtime Platform Environment
集群部署情况: nameserver+controller * 3节点,配置文件如下:
listenPort = 9876
enableControllerInNamesrv = true
controllerDLegerGroup = group1
controllerDLegerPeers = n0-rmq-namesrv01:9873;n1-rmq-namesrv02:9873;n2-rmq-namesrv03:9873
controllerDLegerSelfId = n0..2 # 0-2
controllerStorePath = /data/rocketmq/DledgerController
broker4个节点,2个master,2个slave,配置如下:
brokerClusterName=DefaultCluster
brokerName=broker-a //两个broker-a,两个broker-b
brokerId=0
deleteWhen=04
fileReservedTime=48
flushDiskType=ASYNC_FLUSH
maxTransferCountOnMessageInMemory=400
maxTransferCountOnMessageInDisk=100
enableControllerMode=true
controllerAddr=rmq-namesrv01:9873;rmq-namesrv02:9873;rmq-namesrv03:9873
namesrvAddr=rmq-namesrv01:9876;rmq-namesrv02:9876;rmq-namesrv03:9876
storePathRootDir=/data/rocketmq/rocketmq/store
storePathCommitLog=/data/rocketmq/rocketmq/store/commitlog
storePathEpochFile = /data/rocketmq/rocketmq/store
brokerIP1=172.19.8.40
aclEnable=true
启动方式: sh bin/mqbroker -c $ROCKETMQ_HOME/conf/broker.conf --enable-proxy
RocketMQ Version of the Client/Server
5.3.0
Run or Compiler Version
java-11-openjdk-amd64
Describe the Bug
sdk版本:rocketmq = "5"
代码:
async fn handle(Component(producer): Component<Rmq>, Json(payload): Json<Payload>) -> Result<impl IntoResponse> {
let payload_str = payload.payload.to_string();
let payload_str = payload_str.as_str();
let topic = match payload.topic {
None => {
error!("Failed to publish message, topic is null: {:?}", payload_str);
return Ok((
StatusCode::INTERNAL_SERVER_ERROR,
"Failed to publish message, topic is null".to_string(),
));
}
Some(t) => t,
};
let mut builder = MessageBuilder::builder()
.set_topic(topic)
.set_body(payload_str.as_bytes().to_vec());
if let Some(key) = payload.key {
builder = builder.set_keys(vec![key]);
}
if let Some(tag) = payload.tag {
builder = builder.set_tag(tag);
}
if let Some(group) = payload.group {
builder = builder.set_message_group(group);
}
// build message
let message = match builder.build() {
Ok(e) => e,
Err(e) => {
error!("Failed to build message: {:?}", e);
return Ok((
StatusCode::INTERNAL_SERVER_ERROR,
"Failed to build message".to_string(),
));
}
};
// send message to rocketmq proxy
let result = match producer.send_message(message).await {
Ok(e) => e,
Err(e) => {
error!("Failed to send message to rocketmq: {:?}", e);
return Ok((
StatusCode::INTERNAL_SERVER_ERROR,
"Failed to send message to rocketmq".to_string(),
));
}
};
info!("success: {}, message_id={}", payload_str,result.message_id());
Ok((StatusCode::OK, "to rocketmq success".to_string()))
}
Steps to Reproduce
发送消息到rocketmq,有时候正常,有时候报错,概率差不多50%,报错内容如下: Caused by: Server error at client.send_message, context: { code: INTERNAL_SERVER_ERROR, message: org.apache.rocketmq.proxy.common.ProxyException: service not available now. It may be caused by one of the following reasons: the broker's disk is full [CL: 0.05 CQ: 0.05 INDEX: 0.05], messages are put to the slave, message store has been shut down, etc. } => server return an error
使用java sdk,相同的业务逻辑,不会报错
What Did You Expect to See?
不报错
What Did You See Instead?
报错
Additional Context
No response