Message type conversion is not supported between RocketMQ message types
SeataMQProducer#doSendMessageInTransaction
public SendResult doSendMessageInTransaction(final Message msg, long timeout, String xid, long branchId)
throws MQClientException {
msg.setTopic(withNamespace(msg.getTopic()));
if (msg.getDelayTimeLevel() != 0) {
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
}
Validators.checkMessage(msg, this);
SendResult sendResult = null;
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.getProducerGroup());
MessageAccessor.putProperty(msg, PROPERTY_SEATA_XID, xid);
MessageAccessor.putProperty(msg, PROPERTY_SEATA_BRANCHID, String.valueOf(branchId));
try {
sendResult = superSend(msg, timeout);
} catch (Exception e) {
throw new MQClientException("send message Exception", e);
}
if (SendStatus.SEND_OK != sendResult.getSendStatus()) {
throw new RuntimeException("Message send fail.status=" + sendResult.getSendStatus());
}
if (sendResult.getTransactionId() != null) {
msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
}
String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (null != transactionId && !"".equals(transactionId)) {
msg.setTransactionId(transactionId);
}
return sendResult;
}
由于rocketmq的消息类型不支持转换,所以将消息纳入至seata分布式事务中时,当前处理方式是将消息取消延时参数,然后转换成事务消息(半消息)进行发送。但是用户侧对seata的这种行为完全无感知,只会发现自己的延时消息失效了,变成实时消息了(事务消息提交后立马可见),所以我们应该提示用户seata的这种行为将使延迟消息失效,或者seata应该不代理延迟消息。 Since RocketMQ's message type does not support conversion, when incorporating messages into Seata distributed transactions, the current approach is to remove the delay parameter from the message and convert it into a transactional message (half message) for sending. However, this behavior is completely transparent to the user, who will only notice that their delayed message has become a real-time message (immediately visible after the transactional message is committed). Therefore, we should either notify users that Seata's behavior will invalidate delayed messages or ensure that Seata does not handle delayed messages.
@funky-eyes 请问这个issue还有效吗,我想完成这个工作,我打算在全局事务中检测到 RocketMQ 延时消息时,不抛错也不改变当前把消息作为事务消息发送的实现,而是在关键位置(SeataMQProducer#doSendMessageInTransaction / TCCRocketMQImpl.prepare)记录一条明显的 WARN 日志,提示用户“延时语义在事务场景中会失效”,以保持向后兼容。
Is this issue still active? I would like to work on it. My plan: when a RocketMQ delayed message is detected inside a Seata global transaction, I will not throw an error and I will not change the current behavior of sending the message as a transactional (half) message. Instead, I will add a clear WARN-level log at the key locations (SeataMQProducer#doSendMessageInTransaction and TCCRocketMQImpl.prepare) to inform users that the delay semantics will not be preserved in transactional scenarios. This keeps backward compatibility.