transmittable-thread-local icon indicating copy to clipboard operation
transmittable-thread-local copied to clipboard

出现了线程上下文流转不成功

Open kevinWangSheng opened this issue 1 year ago • 0 comments

bg:在使用plusar进行处理消息消费的时候,消息的具体处理之前会使用该TransmittableThreadLocal进行移除和设置。 但是现在发现了一个上下文失效的情况:

消费代码:

@PulsarConsumer(topic = "cepf_platform_warehouse_shein",
            subscriptionName = "cepf_warehouse",
            serverName = "amsPulsar",
            clazz = byte[].class,
            initialPosition = SubscriptionInitialPosition.Latest,
            batchAckMode = BatchAckMode.MANUAL,
            serialization = Serialization.BYTE,
            maxRedeliverCount = 2,
            deadLetterTopic = "cepf_platform_warehouse_shein_dlq"
    )
    public void consumeMsg(PulsarMessage<byte[]> message ){
        String dataJson = new String(message.getValue());
        log.info("消费消息:{}", dataJson);
        JSONObject data = JSON.parseObject(dataJson);
        String messageTypeCode = data.getString("messageTypeCode");
        String messageId = data.getString("messageId");
        JSONObject messageContent = data.getJSONObject("messageContent");
        IMessageMgmtService messageMgmtService = MessageHandleAdapter.getMessageMgmtService(messageTypeCode);
        try {

            if(Objects.isNull(messageMgmtService)) {
                log.warn("messageTypeCode:{}不支持消费该messageTypeCode.messageId:{}", messageTypeCode, message.getMessageId());
                return;
            }

            Map<String, String> extParam = Maps.newHashMap();
            extParam.put("version", data.getString("createTime"));
            extParam.put("messageId", messageId);
            messageMgmtService.processMessage(messageContent, extParam);
            log.info("【{}】消费成功,messageId:{}", messageTypeCode,  message.getMessageId());
            return;
        } catch (Exception e) {
            //注意:需要区分什么类型的消息加入重试队列
            log.error("{}-消费失败加入重试队列pulsar:{}, messageId:{}", messageTypeCode, message.getMessageId(), messageId, e);
            throw e;
        }
    }

执行消费的逻辑: 具体的这段代码的逻辑:messageMgmtService.processMessage(messageContent, extParam);

    @Override
    public void processMessage(JSONObject messageContent, Map<String, String> extParam) {
        RLock rLock = null;
        try{
            //1.解析推送源数据
            WarehouseOriginalDataBO originalDataBO = this.parse(messageContent, getOriginalDataClass());
            originalDataBO.setVersion(extParam.get("version"));
            originalDataBO.setPlatformCode(getPlatformCode().getCode());
            originalDataBO.setItemId(getUniqueId(originalDataBO));
            originalDataBO.setBusinessType(getBusinessType().getType());

            //处理线程上线文companyId
            AkRequest request = AkRequestContext.getRequest();
            if(Objects.isNull(request)) {
                request = new AkRequest();
            }
            request.setCompanyId(String.valueOf(originalDataBO.getCompanyId()));
            AkRequestContext.put(request);

            //1.1校验一下data数据内是否都合法
            JSON rawResp = originalDataBO.getRawResp();
            if(Objects.isNull(rawResp)) {
                log.warn("[采集]推送库存消息不合法,raw resp 不能为null.biz-code:{},msg:{}", getPlatformCode().getName(), messageContent.toJSONString());
                return;
            }

            Data data = parseData(originalDataBO);

            //校验
            if(!isValidData(data)) {
                log.warn("[采集]推送库存消息不合法,raw resp 数据不合法biz-code:{},msg:{}",getPlatformCode().getName(), messageContent.toJSONString());
                return;
            }

            //3.校验数据版本号
            while (this.checkDataVersion(originalDataBO)) {
                //3.加锁
                rLock = getRLock(redisLockPrefix, String.valueOf(originalDataBO.getCompanyId()),
                        String.valueOf(originalDataBO.getStoreId()), originalDataBO.getItemId());

                if(rLock.tryLock(WAIT_TIME_OUT, LEASE_TIME_OUT, TimeUnit.SECONDS)) {

                    try {
                        //2.先落原始表
                        warehouseOriginalDataService.store(originalDataBO, getStock());
                        //4.进行业务操作
                        boolean deal = this.deal(messageContent, originalDataBO, data, extParam);

                        if(deal) {
                            //5.执行成功再更新redis最新的版本号
                            this.setVersion(originalDataBO);
                        }
                    } finally {
                        //6.释放锁
                        if(Objects.nonNull(rLock)) {
                            if(rLock.isLocked()) {
                                rLock.unlock();
                            }
                        }
                    }

                }
            }

        } catch (IllegalArgumentException e) {
            //1、消息的不合法,可以直接打出异常,不进行处理,消息也不需要重试,直接抛出堆栈信息不然不好排查问题
            log.warn("[采集]推送库存消息不合法,msg:{}",messageContent.toJSONString(), e);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
    
    在这里进行了上下文的设置:
    
    //处理线程上线文companyId
            AkRequest request = AkRequestContext.getRequest();
            if(Objects.isNull(request)) {
                request = new AkRequest();
            }
            request.setCompanyId(String.valueOf(originalDataBO.getCompanyId()));
            AkRequestContext.put(request);


put 方法会先remove掉,然后在put:
public static void put(AkRequest request) {
        if (Objects.nonNull(get())) {
            remove();
        }

        if (request != null) {
            requests.set(request);
        }

    }
    remoe方法:
public static void remove() {
        requests.remove();
    }

业务代码:
@Override
    public boolean deal(JSONObject msgContent, WarehouseOriginalDataBO warehouseOriginalDataBO,
                        SheinPurchaseOrderMsgBO orderMsgBO, Map<String, String> extParam) {
        return sheinPurchaseOrderService.dealPlatformPurchaseOrder(orderMsgBO);
    }



@Override
    @Transactional(rollbackFor = Exception.class)
    @RedissonLock(key = "#bo.orderNo",waitTime = 1000)
    public Boolean dealPlatformPurchaseOrder(SheinPurchaseOrderMsgBO bo) {
        // 1. 判断平台采购单是否在数据库中已存在 按什么唯一键 采购单号
        Long companyId = bo.getCompanyId();
        String orderNo = bo.getOrderNo();
        Long storeId = bo.getStoreId();
        SheinPurchaseOrderPO exist = sheinPurchaseOrderDAO.findByPurchaseOrderNo(companyId, storeId, orderNo);

        // 2. 平台采购单转换成po
        SheinPurchaseOrderPO sheinPurchaseOrderPO = convertBoToPo(bo);

        Boolean statusChange = false;
        if (Objects.nonNull(exist)) {
            sheinPurchaseOrderPO.setId(exist.getId());
            // TODO 如果状态更新才设置更新时间吗
            if (!Objects.equals(exist.getStatus(), bo.getStatus())) {
                sheinPurchaseOrderPO.setStatusChangeDate(new Date());
                statusChange = true;
            }
            // 生成唯一id
        } else {
            sheinPurchaseOrderPO.setStatusChangeDate(new Date());
        }

        // 3. 平台采购单行转换成 订单商品行数据
        List<SheinPurchaseOrderItemPO> needSaveItems = convertBoToItemPO(bo);

        // 4. 更新入库
        // 4.1 主表数据入库
        boolean orderSaveResult = sheinPurchaseOrderDAO.saveOrUpdate(sheinPurchaseOrderPO);

        // 4.2 item数据入库
        needSaveItems.forEach(r -> r.setOrderId(sheinPurchaseOrderPO.getId()));
        boolean itemSaveResult = saveItemPos(companyId, sheinPurchaseOrderPO.getId(), needSaveItems);

        // 4.3 保存变更日志
        // 插入采购单状态变更日志
        saveOperationLog(sheinPurchaseOrderPO, statusChange);

        // 4.3 新增配对关系
        try {
            List<String> mskuList = new ArrayList<>();
            if (CollectionUtils.isNotEmpty(needSaveItems)) {
                mskuList = needSaveItems.stream().map(SheinPurchaseOrderItemPO::getMsku).distinct().collect(Collectors.toList());
            }
            addPair(companyId, storeId, mskuList, PlatformCodeEnum.FBSheinManage.getCode());
        } catch (Exception exception) {
            log.info("采购单号:{} 新增配对失败", bo.getOrderNo(), exception);
        }

        return orderSaveResult & itemSaveResult;
    }
    
    会在RequestUtil这里用到这个线程的上下文:


public static Long getCompanyId() {
        String companyIdStr = AkRequestContext.getCompanyId();
        log.info("AkRequestContext 基础平台获取companyId:{}", companyIdStr);
        if (StrUtil.isNotBlank(companyIdStr)) {
            return Long.parseLong(companyIdStr);
        } else {
            HttpServletRequest request = getRequest();
            String headerCompanyId = request.getHeader(AK_COMPANY_ID_KEY);
            log.info("HttpServletRequest 请求头获取companyId:{}", headerCompanyId);
            if (StrUtil.isNotBlank(headerCompanyId)) {
                return Long.parseLong(headerCompanyId);
            } else {
                log.error("从线程的ThreadLocal中和请求header信息中获取不到companyId,现在给出默认值NON_COMPANY_ID请检查是否因为线程切换导致。threadName:{}", Thread.currentThread().getName());
                throw new BusinessException("获取不到企业ID");
            }
        }
    }

在数据库查询的时候会使用他作为分片进行分库分表。

然后现在日志是有问题的,具体问题如下:

这里的一条数据消费,他的ccompanyId是:901380674060402688 image

但是在执行后面变成了这个:901248346235397632

image

然后那个对应的901248346235397632也是一条消费逻辑,可以看到时间在48.993他进来了: image

然后第一条也是48.993进来消费的,第二条覆盖了第一条,请问这个是ttl本身的原因呢还是什么?

使用的是plusar进行消息执行,也没看到IO阻塞切换线程,这个是什么原因呢?

kevinWangSheng avatar Sep 19 '24 13:09 kevinWangSheng