incubator-seata
incubator-seata copied to clipboard
[分享] seata1.5.2 如何在二阶段发生异常时告警
场景描述
RM二阶段发生异常时,TC是会进行不断的重试。直到成功为止 那么,问题就来了。一直重试虽然在一定程度上可能成功,但大部分情况下一直重试是不会成功的 这个时候,是需要作出一定的告警措施,比如发邮件,发钉钉之类的
比较幸运的是,seata提供了SPI,这样可以比较方便的实现。经过一番研究尝试,终于找到比较好的实现方式。如下,enjoy!
Step One
注入自己实现的SPI 新建文件: src/main/resources/META-INF/services/io.seata.core.model.ResourceManager 文件内容如下。注意,是实现类的全路径来的,不懂SPI机制可先自行谷歌
com.xxx.MyTCCResourceManager
Step Two
com.xxx.MyTCCResourceManager类代码。大部分抄官方的io.seata.rm.tcc.TCCResourceManager代码即可,这里只列出关键代码
@Override
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,
String applicationData) throws TransactionException {
TCCResource tccResource = (TCCResource)tccResourceCache.get(resourceId);
if (tccResource == null) {
throw new ShouldNeverHappenException(String.format("TCC resource is not exist, resourceId: %s", resourceId));
}
Object targetTCCBean = tccResource.getTargetBean();
Method commitMethod = tccResource.getCommitMethod();
if (targetTCCBean == null || commitMethod == null) {
throw new ShouldNeverHappenException(String.format("TCC resource is not available, resourceId: %s", resourceId));
}
try {
//BusinessActionContext
BusinessActionContext businessActionContext = getBusinessActionContext(xid, branchId, resourceId,
applicationData);
Object[] args = this.getTwoPhaseCommitArgs(tccResource, businessActionContext);
Object ret;
boolean result;
// add idempotent and anti hanging
if (Boolean.TRUE.equals(businessActionContext.getActionContext(Constants.USE_TCC_FENCE))) {
try {
result = TCCFenceHandler.commitFence(commitMethod, targetTCCBean, xid, branchId, args);
} catch (SkipCallbackWrapperException | UndeclaredThrowableException e) {
throw e.getCause();
}
} else {
ret = commitMethod.invoke(targetTCCBean, args);
if (ret != null) {
if (ret instanceof TwoPhaseResult) {
result = ((TwoPhaseResult)ret).isSuccess();
} else {
result = (boolean)ret;
}
} else {
result = true;
}
}
LOGGER.info("TCC resource commit result : {}, xid: {}, branchId: {}, resourceId: {}", result, xid, branchId, resourceId);
if (!result) {
// !!!FBI WARNING!!! 关键地方。把你做告警的代码放这里就可以了!!!
}
return result ? BranchStatus.PhaseTwo_Committed : BranchStatus.PhaseTwo_CommitFailed_Retryable;
} catch (Throwable t) {
String msg = String.format("commit TCC resource error, resourceId: %s, xid: %s.", resourceId, xid);
LOGGER.error(msg, t);
// !!!FBI WARNING!!! 关键地方。把你做告警的代码放这里就可以了!!!
return BranchStatus.PhaseTwo_CommitFailed_Retryable;
}
}
@Override
public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId,
String applicationData) throws TransactionException {
TCCResource tccResource = (TCCResource)tccResourceCache.get(resourceId);
if (tccResource == null) {
throw new ShouldNeverHappenException(String.format("TCC resource is not exist, resourceId: %s", resourceId));
}
Object targetTCCBean = tccResource.getTargetBean();
Method rollbackMethod = tccResource.getRollbackMethod();
if (targetTCCBean == null || rollbackMethod == null) {
throw new ShouldNeverHappenException(String.format("TCC resource is not available, resourceId: %s", resourceId));
}
try {
//BusinessActionContext
BusinessActionContext businessActionContext = getBusinessActionContext(xid, branchId, resourceId,
applicationData);
Object[] args = this.getTwoPhaseRollbackArgs(tccResource, businessActionContext);
Object ret;
boolean result;
// add idempotent and anti hanging
if (Boolean.TRUE.equals(businessActionContext.getActionContext(Constants.USE_TCC_FENCE))) {
try {
result = TCCFenceHandler.rollbackFence(rollbackMethod, targetTCCBean, xid, branchId,
args, tccResource.getActionName());
} catch (SkipCallbackWrapperException | UndeclaredThrowableException e) {
throw e.getCause();
}
} else {
ret = rollbackMethod.invoke(targetTCCBean, args);
if (ret != null) {
if (ret instanceof TwoPhaseResult) {
result = ((TwoPhaseResult)ret).isSuccess();
} else {
result = (boolean)ret;
}
} else {
result = true;
}
}
LOGGER.info("TCC resource rollback result : {}, xid: {}, branchId: {}, resourceId: {}", result, xid, branchId, resourceId);
if (!result) {
// !!!FBI WARNING!!! 关键地方。把你做告警的代码放这里就可以了!!!
}
return result ? BranchStatus.PhaseTwo_Rollbacked : BranchStatus.PhaseTwo_RollbackFailed_Retryable;
} catch (Throwable t) {
String msg = String.format("rollback TCC resource error, resourceId: %s, xid: %s.", resourceId, xid);
LOGGER.error(msg, t);
// !!!FBI WARNING!!! 关键地方。把你做告警的代码放这里就可以了!!!
return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
}
}
分享另外一个。 在做这个告警的时候,一开始是想着用aop LocalTCC这个注解来实现的,但用了aop之后,会导致confirm和cancel拿不到全局事务id的!所以这种方法是不可取的。
反正也写了一些代码,就分享一下,记住,以下代码会导致问题的!!!
切面类
@Slf4j
public class LibBusinessSeataMethodInterceptor implements MethodInterceptor, Ordered {
private volatile boolean disable = ConfigurationFactory.getInstance().getBoolean(
ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, DEFAULT_DISABLE_GLOBAL_TRANSACTION);
private final PlatformSysNoticeRecordClient platformSysNoticeRecordClient;
public LibBusinessSeataMethodInterceptor(PlatformSysNoticeRecordClient platformSysNoticeRecordClient) {
this.platformSysNoticeRecordClient = platformSysNoticeRecordClient;
}
@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
// 没在seata事务中,或者没开启seata事务,直接返回
// 对,用这种aop来做的话,就是这里会有问题!!!,拿不到全局事务id的!!!
if (!RootContext.inGlobalTransaction() || disable || RootContext.inSagaBranch()) {
return invocation.proceed();
}
Method method = invocation.getMethod();
LocalTCC localTCC = method.getDeclaringClass().getAnnotation(LocalTCC.class);
if (localTCC == null) {
return invocation.proceed();
}
Method[] methods = method.getDeclaringClass().getMethods();
TwoPhaseBusinessAction twoPhaseBusinessAction = null;
for (Method methodExist : methods) {
twoPhaseBusinessAction = methodExist.getAnnotation(TwoPhaseBusinessAction.class);
if (twoPhaseBusinessAction != null) {
break;
}
}
// 极端情况,找不到这个注解方法
if (twoPhaseBusinessAction == null) {
log.info(
"Not found twoPhaseBusinessAction method. class: {}, methods: {}",
method.getDeclaringClass().getName(),
methods
);
return invocation.proceed();
}
String commitMethodName = twoPhaseBusinessAction.commitMethod();
String cancelMethodName = twoPhaseBusinessAction.rollbackMethod();
if (!method.getName().equals(commitMethodName) && !method.getName().equals(cancelMethodName)) {
return invocation.proceed();
}
try {
return invocation.proceed();
} catch (Throwable e) {
Object[] args = invocation.getArguments();
// 理论上应该是第一参数为BusinessActionContext
if (args.length > 0 && args[0] instanceof BusinessActionContext) {
BusinessActionContext businessActionContext = (BusinessActionContext) args[0];
sendDingTalkNotify(String.format(
"Seata onBeginFailure. txId: %s, branchId: %s, actionName: %s, stackTrace: %s",
businessActionContext.getXid(),
businessActionContext.getBranchId(),
businessActionContext.getActionName(),
JSON.toJSONString(e.getStackTrace())
));
}
throw e;
}
}
private void sendDingTalkNotify(String message) {
// 告警代码
}
@Override
public int getOrder() {
return Integer.MIN_VALUE;
}
}
配置类
@Configuration
public class LibBusinessSeataInterceptorConfig {
@Bean
public DefaultPointcutAdvisor defaultPointcutAdvisor(PlatformSysNoticeRecordClient platformSysNoticeRecordClient) {
LibBusinessSeataMethodInterceptor libBusinessSeataMethodInterceptor =
new LibBusinessSeataMethodInterceptor(platformSysNoticeRecordClient);
AspectJExpressionPointcut pointcut = new AspectJExpressionPointcut();
// 只拦截confirm和cancel方法
pointcut.setExpression("execution(public boolean com.ppwang..*.data..*.*(io.seata.rm.tcc.api.BusinessActionContext))");
DefaultPointcutAdvisor advisor = new DefaultPointcutAdvisor();
advisor.setPointcut(pointcut);
advisor.setAdvice(libBusinessSeataMethodInterceptor);
return advisor;
}
}
建议放到Discussions去,或者给官网https://github.com/seata/seata.github.io 提交一篇博客pr
还有个问题我想讨论下,作为用户视角,你觉得发出告警的是client,还是server呢?比如server下发给client,其执行一直失败,这个其实server端也能感知到,比如在prometheus打个点,是否也能做成告警? 但是我总觉得client去处理的话自定义打点肯定比server要细致,更容易也更精确知道一些问题
还有个问题我想讨论下,作为用户视角,你觉得发出告警的是client,还是server呢?比如server下发给client,其执行一直失败,这个其实server端也能感知到,比如在prometheus打个点,是否也能做成告警? 但是我总觉得client去处理的话自定义打点肯定比server要细致,更容易也更精确知道一些问题
感谢回复。我昨晚看完你的回复之后,想了一夜,觉得你说的比较符合实情,放在server用prometheus做告警这样会比较综合处理一些,这样会更省事。而放client的话,虽然说是说能更细致,但复杂度多很多 @a364176773
@a364176773
我看了下server-1.5.2的metrics。如果做监控的话,需要为每个接入的分布式事务做才行,没有统一的指标。如下图所示