dtm
dtm copied to clipboard
XA全局事务成功后,DTM Admin 显示正常,但是本地数据库XA事务未提交,要修改的数据未变动,需要手动执行XA事务提交才生效
原始数据库表截图如下:
使用Dtm XA分布式事务,实现如下场景:用户1002 balance转出 100 1001 balance转入100,
程序执行成功,并执行成功后,dtm admin 的截图如下:
在Dtm XA分布式事务完成后,数据库表截图如下:
可见结果未达到预期,在本地数据库中使用xa recover命令后,发现如下截图:
在本地数据库使用如下命令手动提交XA事务:
xa commit '5b63c59f-a435-4ffc-bf4e-e6dc5190cf1d-02';
xa commit '5b63c59f-a435-4ffc-bf4e-e6dc5190cf1d-01';
数据库表数据发生了预期的变化,数据库表截图如下:
能有可以复现的例子吗?
å ·ä½ç示ä¾çå¦ä¸ï¼æä¸¤ä¸ªæä»¶ï¼ä¸ä¸ªæ¯test_dtm.goï¼ä»£ç å¦ä¸ï¼
package main
import (
"crypto/md5"
"fmt"
"github.com/bwmarrin/snowflake"
"github.com/dtm-labs/client/dtmcli"
"github.com/gin-gonic/gin"
"github.com/go-resty/resty/v2"
"github.com/google/uuid"
"log"
"strings"
)
func CryptToMD5(v1 []byte, v2 []byte, uppercase bool) string {
var rs = make([]string, 0)
m := md5.New()
m.Write(v1)
bm := m.Sum(v2)
for _, v := range bm {
if uppercase {
rs = append(rs, fmt.Sprintf("%02X", v))
} else {
rs = append(rs, fmt.Sprintf("%02x", v))
}
}
return strings.Join(rs, "")
/*return hex.EncodeToString(h.Sum(format.ToByte(v2)))*/ //第äºç§è¿ååç¬¦ä¸²çæ¹æ³ï¼è¿åçåæ°æ¯å°å
}
func NewGid() string {
u := uuid.New()
return fmt.Sprintf("%s", u)
}
func TransXa(serverUrl, businessUrl string) {
gid := NewGid()
transInReq := &gin.H{
"transInUserId": 1001,
"amount": 100,
}
transOutReq := &gin.H{
"transOutUserId": 1002,
"amount": 100,
}
err := dtmcli.XaGlobalTransaction(serverUrl, gid, func(xa *dtmcli.Xa) (*resty.Response, error) {
resp, err := xa.CallBranch(transOutReq, businessUrl+"/trans-out-xa")
if err != nil {
return resp, err
}
return xa.CallBranch(transInReq, businessUrl+"/trans-in-xa")
})
if err != nil {
log.Fatal(err)
}
log.Printf("transaction:%s xa success", gid)
}
func main() {
serverUrl := "http://192.168.0.101:31980/api/dtmsvr"
businessUrl := "http://192.168.0.101:8686/api/v1/business"
TransXa(serverUrl, businessUrl)
}
ä¸ä¸ªæä»¶æ¯test_dtm_http_xa.goï¼ä»£ç å¦ä¸ï¼
package main
import (
"database/sql"
"fmt"
"github.com/dtm-labs/client/dtmcli"
"github.com/gin-gonic/gin"
"github.com/gin-gonic/gin/binding"
"github.com/go-playground/locales/en"
"github.com/go-playground/locales/zh"
ut "github.com/go-playground/universal-translator"
"github.com/go-playground/validator/v10"
enTranslations "github.com/go-playground/validator/v10/translations/en"
zhTranslations "github.com/go-playground/validator/v10/translations/zh"
jsoniter "github.com/json-iterator/go"
rotatelogs "github.com/lestrrat-go/file-rotatelogs"
"github.com/pkg/errors"
"gorm.io/driver/mysql"
"gorm.io/driver/postgres"
"gorm.io/gorm"
"gorm.io/gorm/logger"
"log"
"net/http"
"net/url"
"time"
)
type TransInXaRequest struct {
TransInUserId int `json:"transInUserId" binding:"required" desc:"转å
¥è´¦å·id"` //转å
¥è´¦å·id
Amount float64 `json:"amount" binding:"required" desc:"转å
¥éé¢"` //转å
¥éé¢
}
type TransOutXaRequest struct {
TransOutUserId int `json:"transOutUserId" binding:"required" desc:"转åºè´¦å·id"` //转åºè´¦å·id
Amount float64 `json:"amount" binding:"required" desc:"转åºéé¢"` //转åºéé¢
}
func initValidator(locale string) (ut.Translator, error) {
if v, ok := binding.Validator.Engine().(*validator.Validate); ok {
zhTranslator := zh.New()
enTranslator := en.New()
uni := ut.New(enTranslator, zhTranslator, enTranslator)
translator, ok := uni.GetTranslator(locale)
if !ok {
return nil, errors.New("init validator failed")
}
var err error
switch locale {
case "en":
err = enTranslations.RegisterDefaultTranslations(v, translator)
case "zh":
err = zhTranslations.RegisterDefaultTranslations(v, translator)
default:
err = enTranslations.RegisterDefaultTranslations(v, translator)
}
return translator, err
}
return nil, errors.New("init validator failed")
}
func ValidateError(ctx *gin.Context, err error) bool {
if err == nil {
return false
}
errs, ok := err.(validator.ValidationErrors)
if ok {
translator, err := initValidator("zh")
if err == nil {
return false
}
errMsg, _ := jsoniter.Marshal(errs.Translate(translator))
ctx.JSON(http.StatusOK, gin.H{
"code": -1,
"message": "validate request failed:" + string(errMsg),
})
} else {
ctx.JSON(http.StatusOK, gin.H{
"code": -1,
"message": "read request failed:" + err.Error(),
})
}
return true
}
func ResponseFailed(ctx *gin.Context, message string, err error) {
if err == nil {
ctx.JSON(http.StatusOK, gin.H{
"code": -1,
"message": message + "!",
})
} else {
ctx.JSON(http.StatusOK, gin.H{
"code": -1,
"message": message + "," + err.Error(),
})
}
}
func ResponseSuccess(ctx *gin.Context, message string, data ...interface{}) {
if len(data) > 0 {
ctx.JSON(http.StatusOK, gin.H{
"code": 0,
"message": message + "!",
"data": data[0],
})
} else {
ctx.JSON(http.StatusOK, gin.H{
"code": 0,
"message": message + "!",
})
}
}
func ResponseWithStatusCode(ctx *gin.Context, statusCode int, code int, message string, data ...interface{}) {
if statusCode != 200 {
code = statusCode
}
if len(data) > 0 {
ctx.JSON(statusCode, gin.H{
"code": code,
"message": message,
"data": data[0],
})
} else {
ctx.JSON(statusCode, gin.H{
"code": code,
"message": message,
})
}
}
type DBConf struct {
User string `yaml:"user" desc:"ç¨æ·å"`
Password string `yaml:"password" desc:"ç¨æ·å¯ç "`
Host string `yaml:"host" desc:"主æºå"`
Port int `yaml:"port" desc:"主æºç«¯å£"`
Database string `yaml:"database" desc:"æ°æ®åºå"`
Dialect string `yaml:"dialect" desc:"æ°æ®åºç±»å"`
DBSource string `yaml:"db_source" desc:"æ°æ®åºæº"`
DBDebug bool `yaml:"db_debug" desc:"æ¯å¦è¾åºgormæ°æ®åºè°è¯è¯å¥"`
MaxAge int `yaml:"max_age" desc:"æ¥å¿æå¤§ä¿çæ¶é´"`
RotateTimeLevel int `yaml:"rotate_time_level" desc:"æ¥å¿åçæ¶é´ç级 0 èªå®ä¹æ¶é´åç 1 æ¥åç 2 1å°æ¶åç 3 1åéåç"`
RotateTime int `yaml:"rotate_time" desc:"èªå®ä¹æ¶é´åçæ¶é¿ åä½ä¸º:min"`
}
const RotateByTimestamp = 0 //èªå®ä¹æ¶é´åç
const RotateByDate = 1 //æ¥åç
const RotateByHour = 2 //1å°æ¶åç
const RotateByMinute = 3 //1åéåç
type Dao struct {
DB *gorm.DB
}
func NewDBFromRawDB(db *sql.DB, dbConf DBConf) (*gorm.DB, error) {
var newDb *gorm.DB
var err error
if dbConf.DBDebug {
var err error
var loggerWriteSyncer *rotatelogs.RotateLogs
var loggerFileName = fmt.Sprintf("%s-db-debug.log", dbConf.Database)
switch dbConf.RotateTimeLevel {
case RotateByTimestamp:
loggerWriteSyncer, err = rotatelogs.New(
"../logs/"+loggerFileName+".%Y%m%d%H%M",
rotatelogs.WithLinkName("../logs/"+loggerFileName),
rotatelogs.WithMaxAge(time.Duration(dbConf.MaxAge)*time.Hour),
rotatelogs.WithRotationTime(time.Duration(dbConf.RotateTime)*time.Minute))
case RotateByDate:
loggerWriteSyncer, err = rotatelogs.New(
"../logs/"+loggerFileName+".%Y%m%d",
rotatelogs.WithLinkName("../logs/"+loggerFileName),
rotatelogs.WithMaxAge(time.Duration(dbConf.MaxAge)*time.Hour))
case RotateByHour:
loggerWriteSyncer, err = rotatelogs.New(
"../logs/"+loggerFileName+".%Y%m%d%H",
rotatelogs.WithLinkName("../logs/"+loggerFileName),
rotatelogs.WithMaxAge(time.Duration(dbConf.MaxAge)*time.Hour),
rotatelogs.WithRotationTime(time.Hour))
case RotateByMinute:
loggerWriteSyncer, err = rotatelogs.New(
"../logs/"+loggerFileName+".%Y%m%d%H%M",
rotatelogs.WithLinkName("../logs/"+loggerFileName),
rotatelogs.WithMaxAge(time.Duration(dbConf.MaxAge)*time.Hour),
rotatelogs.WithRotationTime(time.Minute))
}
if err != nil {
return nil, err
}
dbLogger := logger.New(log.New(loggerWriteSyncer, "\r\n", log.LstdFlags), logger.Config{
SlowThreshold: time.Second,
LogLevel: logger.Silent,
Colorful: true,
})
switch dbConf.Dialect {
case "mysql":
newDb, err = gorm.Open(mysql.New(mysql.Config{
Conn: db,
}), &gorm.Config{
Logger: dbLogger,
})
break
case "postgres":
newDb, err = gorm.Open(postgres.New(postgres.Config{
Conn: db,
}), &gorm.Config{
Logger: dbLogger,
})
break
}
newDb = newDb.Debug()
} else {
switch dbConf.Dialect {
case "mysql":
newDb, err = gorm.Open(mysql.New(mysql.Config{
Conn: db,
}))
break
case "postgres":
newDb, err = gorm.Open(postgres.New(postgres.Config{
Conn: db,
}))
break
}
}
return newDb, err
}
func (dao *Dao) XaLocalTransaction(qs url.Values, f func(dao *Dao) error) (err error) {
dbConf := DBConf{
User: "root",
Password: "CodeMan2022080^2*1",
Host: "127.0.0.1",
Port: 3306,
Database: "test",
Dialect: "mysql",
DBSource: "root:CodeMan2022080^2*1@tcp(127.0.0.1:3306)/test?charset=utf8mb4&parseTime=True&loc=Local",
DBDebug: true,
RotateTime: 1,
RotateTimeLevel: 1,
}
err = dtmcli.XaLocalTransaction(qs, dtmcli.DBConf{
Driver: dbConf.Dialect,
Host: dbConf.Host,
Port: int64(dbConf.Port),
User: dbConf.User,
Password: dbConf.Password,
Db: dbConf.Database,
}, func(db *sql.DB, xa *dtmcli.Xa) error {
dao := &Dao{}
dao.DB, err = NewDBFromRawDB(db, dbConf)
return f(dao)
})
return
}
func (dao *Dao) TransInXa(userId int, amount float64) error {
result := dao.DB.Exec("update user_account set balance = balance + ? where user_id = ?", amount, userId)
if result.Error != nil {
return result.Error
}
return nil
}
func (dao *Dao) TransOutXa(userId int, amount float64) error {
result := dao.DB.Exec("update user_account set balance = balance - ? where user_id = ?", amount, userId)
if result.Error != nil {
return result.Error
}
return nil
}
func transInXa(ctx *gin.Context) {
var req TransInXaRequest
err := ctx.ShouldBind(&req)
if ValidateError(ctx, err) {
return
}
qs := ctx.Request.URL.Query()
dao := Dao{}
err = dao.XaLocalTransaction(qs, func(dao *Dao) error {
err = dao.TransInXa(req.TransInUserId, req.Amount)
if err != nil {
return errors.New("æ°æ®åºæ´æ°åºé!")
}
return err
})
if err != nil {
ResponseWithStatusCode(ctx, 409, 409, "trans in xa failed:"+err.Error())
return
}
ResponseSuccess(ctx, "trans in xa success")
}
func transOutXa(ctx *gin.Context) {
var req TransOutXaRequest
err := ctx.ShouldBind(&req)
if ValidateError(ctx, err) {
return
}
qs := ctx.Request.URL.Query()
dao := Dao{}
err = dao.XaLocalTransaction(qs, func(dao *Dao) error {
err = dao.TransOutXa(req.TransOutUserId, req.Amount)
if err != nil {
return errors.New("æ°æ®åºæ´æ°åºé!")
}
return err
})
if err != nil {
ResponseWithStatusCode(ctx, 409, 409, "trans out xa failed:"+err.Error())
return
}
ResponseSuccess(ctx, "trans out xa success")
}
func main() {
router := gin.Default()
router.POST("/api/v1/business/trans-in-xa", transInXa)
router.POST("/api/v1/business/trans-out-xa", transOutXa)
router.Run(":8686")
}
è¿è¡ç¯å¢ï¼golangçæ¬1.21ï¼mysqlçæ¬8.0.32 å è¿è¡test_dtm_http_xa.goï¼åè¿è¡test_dtm.goï¼å°±ä¼åºç°dtmæ¾ç¤ºxaäºå¡æäº¤æåï¼ä½æ¯å®é 没ææåæäº¤xaäºå¡çæ åµã