transmittable-thread-local
transmittable-thread-local copied to clipboard
出现了线程上下文流转不成功
bg:在使用plusar进行处理消息消费的时候,消息的具体处理之前会使用该TransmittableThreadLocal进行移除和设置。 但是现在发现了一个上下文失效的情况:
消费代码:
@PulsarConsumer(topic = "cepf_platform_warehouse_shein",
subscriptionName = "cepf_warehouse",
serverName = "amsPulsar",
clazz = byte[].class,
initialPosition = SubscriptionInitialPosition.Latest,
batchAckMode = BatchAckMode.MANUAL,
serialization = Serialization.BYTE,
maxRedeliverCount = 2,
deadLetterTopic = "cepf_platform_warehouse_shein_dlq"
)
public void consumeMsg(PulsarMessage<byte[]> message ){
String dataJson = new String(message.getValue());
log.info("消费消息:{}", dataJson);
JSONObject data = JSON.parseObject(dataJson);
String messageTypeCode = data.getString("messageTypeCode");
String messageId = data.getString("messageId");
JSONObject messageContent = data.getJSONObject("messageContent");
IMessageMgmtService messageMgmtService = MessageHandleAdapter.getMessageMgmtService(messageTypeCode);
try {
if(Objects.isNull(messageMgmtService)) {
log.warn("messageTypeCode:{}不支持消费该messageTypeCode.messageId:{}", messageTypeCode, message.getMessageId());
return;
}
Map<String, String> extParam = Maps.newHashMap();
extParam.put("version", data.getString("createTime"));
extParam.put("messageId", messageId);
messageMgmtService.processMessage(messageContent, extParam);
log.info("【{}】消费成功,messageId:{}", messageTypeCode, message.getMessageId());
return;
} catch (Exception e) {
//注意:需要区分什么类型的消息加入重试队列
log.error("{}-消费失败加入重试队列pulsar:{}, messageId:{}", messageTypeCode, message.getMessageId(), messageId, e);
throw e;
}
}
执行消费的逻辑: 具体的这段代码的逻辑:messageMgmtService.processMessage(messageContent, extParam);
@Override
public void processMessage(JSONObject messageContent, Map<String, String> extParam) {
RLock rLock = null;
try{
//1.解析推送源数据
WarehouseOriginalDataBO originalDataBO = this.parse(messageContent, getOriginalDataClass());
originalDataBO.setVersion(extParam.get("version"));
originalDataBO.setPlatformCode(getPlatformCode().getCode());
originalDataBO.setItemId(getUniqueId(originalDataBO));
originalDataBO.setBusinessType(getBusinessType().getType());
//处理线程上线文companyId
AkRequest request = AkRequestContext.getRequest();
if(Objects.isNull(request)) {
request = new AkRequest();
}
request.setCompanyId(String.valueOf(originalDataBO.getCompanyId()));
AkRequestContext.put(request);
//1.1校验一下data数据内是否都合法
JSON rawResp = originalDataBO.getRawResp();
if(Objects.isNull(rawResp)) {
log.warn("[采集]推送库存消息不合法,raw resp 不能为null.biz-code:{},msg:{}", getPlatformCode().getName(), messageContent.toJSONString());
return;
}
Data data = parseData(originalDataBO);
//校验
if(!isValidData(data)) {
log.warn("[采集]推送库存消息不合法,raw resp 数据不合法biz-code:{},msg:{}",getPlatformCode().getName(), messageContent.toJSONString());
return;
}
//3.校验数据版本号
while (this.checkDataVersion(originalDataBO)) {
//3.加锁
rLock = getRLock(redisLockPrefix, String.valueOf(originalDataBO.getCompanyId()),
String.valueOf(originalDataBO.getStoreId()), originalDataBO.getItemId());
if(rLock.tryLock(WAIT_TIME_OUT, LEASE_TIME_OUT, TimeUnit.SECONDS)) {
try {
//2.先落原始表
warehouseOriginalDataService.store(originalDataBO, getStock());
//4.进行业务操作
boolean deal = this.deal(messageContent, originalDataBO, data, extParam);
if(deal) {
//5.执行成功再更新redis最新的版本号
this.setVersion(originalDataBO);
}
} finally {
//6.释放锁
if(Objects.nonNull(rLock)) {
if(rLock.isLocked()) {
rLock.unlock();
}
}
}
}
}
} catch (IllegalArgumentException e) {
//1、消息的不合法,可以直接打出异常,不进行处理,消息也不需要重试,直接抛出堆栈信息不然不好排查问题
log.warn("[采集]推送库存消息不合法,msg:{}",messageContent.toJSONString(), e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
在这里进行了上下文的设置:
//处理线程上线文companyId
AkRequest request = AkRequestContext.getRequest();
if(Objects.isNull(request)) {
request = new AkRequest();
}
request.setCompanyId(String.valueOf(originalDataBO.getCompanyId()));
AkRequestContext.put(request);
put 方法会先remove掉,然后在put:
public static void put(AkRequest request) {
if (Objects.nonNull(get())) {
remove();
}
if (request != null) {
requests.set(request);
}
}
remoe方法:
public static void remove() {
requests.remove();
}
业务代码:
@Override
public boolean deal(JSONObject msgContent, WarehouseOriginalDataBO warehouseOriginalDataBO,
SheinPurchaseOrderMsgBO orderMsgBO, Map<String, String> extParam) {
return sheinPurchaseOrderService.dealPlatformPurchaseOrder(orderMsgBO);
}
@Override
@Transactional(rollbackFor = Exception.class)
@RedissonLock(key = "#bo.orderNo",waitTime = 1000)
public Boolean dealPlatformPurchaseOrder(SheinPurchaseOrderMsgBO bo) {
// 1. 判断平台采购单是否在数据库中已存在 按什么唯一键 采购单号
Long companyId = bo.getCompanyId();
String orderNo = bo.getOrderNo();
Long storeId = bo.getStoreId();
SheinPurchaseOrderPO exist = sheinPurchaseOrderDAO.findByPurchaseOrderNo(companyId, storeId, orderNo);
// 2. 平台采购单转换成po
SheinPurchaseOrderPO sheinPurchaseOrderPO = convertBoToPo(bo);
Boolean statusChange = false;
if (Objects.nonNull(exist)) {
sheinPurchaseOrderPO.setId(exist.getId());
// TODO 如果状态更新才设置更新时间吗
if (!Objects.equals(exist.getStatus(), bo.getStatus())) {
sheinPurchaseOrderPO.setStatusChangeDate(new Date());
statusChange = true;
}
// 生成唯一id
} else {
sheinPurchaseOrderPO.setStatusChangeDate(new Date());
}
// 3. 平台采购单行转换成 订单商品行数据
List<SheinPurchaseOrderItemPO> needSaveItems = convertBoToItemPO(bo);
// 4. 更新入库
// 4.1 主表数据入库
boolean orderSaveResult = sheinPurchaseOrderDAO.saveOrUpdate(sheinPurchaseOrderPO);
// 4.2 item数据入库
needSaveItems.forEach(r -> r.setOrderId(sheinPurchaseOrderPO.getId()));
boolean itemSaveResult = saveItemPos(companyId, sheinPurchaseOrderPO.getId(), needSaveItems);
// 4.3 保存变更日志
// 插入采购单状态变更日志
saveOperationLog(sheinPurchaseOrderPO, statusChange);
// 4.3 新增配对关系
try {
List<String> mskuList = new ArrayList<>();
if (CollectionUtils.isNotEmpty(needSaveItems)) {
mskuList = needSaveItems.stream().map(SheinPurchaseOrderItemPO::getMsku).distinct().collect(Collectors.toList());
}
addPair(companyId, storeId, mskuList, PlatformCodeEnum.FBSheinManage.getCode());
} catch (Exception exception) {
log.info("采购单号:{} 新增配对失败", bo.getOrderNo(), exception);
}
return orderSaveResult & itemSaveResult;
}
会在RequestUtil这里用到这个线程的上下文:
public static Long getCompanyId() {
String companyIdStr = AkRequestContext.getCompanyId();
log.info("AkRequestContext 基础平台获取companyId:{}", companyIdStr);
if (StrUtil.isNotBlank(companyIdStr)) {
return Long.parseLong(companyIdStr);
} else {
HttpServletRequest request = getRequest();
String headerCompanyId = request.getHeader(AK_COMPANY_ID_KEY);
log.info("HttpServletRequest 请求头获取companyId:{}", headerCompanyId);
if (StrUtil.isNotBlank(headerCompanyId)) {
return Long.parseLong(headerCompanyId);
} else {
log.error("从线程的ThreadLocal中和请求header信息中获取不到companyId,现在给出默认值NON_COMPANY_ID请检查是否因为线程切换导致。threadName:{}", Thread.currentThread().getName());
throw new BusinessException("获取不到企业ID");
}
}
}
在数据库查询的时候会使用他作为分片进行分库分表。
然后现在日志是有问题的,具体问题如下:
这里的一条数据消费,他的ccompanyId是:901380674060402688
但是在执行后面变成了这个:901248346235397632
然后那个对应的901248346235397632也是一条消费逻辑,可以看到时间在48.993他进来了:
然后第一条也是48.993进来消费的,第二条覆盖了第一条,请问这个是ttl本身的原因呢还是什么?
使用的是plusar进行消息执行,也没看到IO阻塞切换线程,这个是什么原因呢?