rocketmq-client-go icon indicating copy to clipboard operation
rocketmq-client-go copied to clipboard

PullBrokerTimeout happed frequently when pulling msg

Open kaybinwong opened this issue 1 year ago • 2 comments

BUG REPORT

  1. Please describe the issue you observed:

pulling msg with 1.2.4, but got PullBrokerTimeout error, it did't hanppend with spring client.

……

err := p.pullMQ(market)
if err != nil {
	log.Warnf("StartConsume pullMQ fail, symbol: %s, err: %v, market:%+v", symbol, err, *market)
	time.Sleep(ERRSLEEP)
}
……

// 通过pull用offset的方式来避免重复消费
func (p *rocketMQProvider) pullMQ(market *Market) error {
	symbol := market.symbol
	topic := TopicNameOrderProvider(symbol)
	//----start---------------
	mqs := p.consumer.FetchSubscriptionMessageQueues(topic)
	//mqs := p.consumer.MessageQueues(topic)
	//----end---------------
	log.Debugf("StartConsume fetch subscription topic:%s, mqs:%v", topic, mqs)
	if len(mqs) != 1 {
		//为了确保顺序性,不能用多个queue
		return errors.New(fmt.Sprintf("topic queue num error, topic:%s, num: %d", topic, len(mqs)))
	}

	mq := mqs[0]
	offset := market.offset

	//---------------start-----------------
	pr := p.consumer.Pull(mq, "*", offset, 800)
	//pr, _ := p.consumer.PullFrom(context.TODO(), mq, offset, 800)
	//---------------end-----------------

	log.Debugf("Leader pull after, offset: %d, mq: %s, result: %+v", offset, mq.String(), pr)

	switch pr.Status {
	case rocketmq.PullNoNewMsg:
		//case primitive.PullNoNewMsg:
		log.Debugf("rocketmq pull nonewmasg, market:%+v", market)
		time.Sleep(time.Millisecond * time.Duration(p.noMsgSleep))
		return nil
	case rocketmq.PullFound:
		//case primitive.PullFound:
		var offsetOrders *OffsetOrders
		isFirst := true
		//for _, msg := range pr.GetMessageExts() {
		for _, msg := range pr.Messages {
			order, err := PbToOrder(msg)
			if err != nil {
				log.Warnf("rocketmq formatOrder failed, offset:%d, err: %v", msg.QueueOffset, err)
				continue
			}
			if isFirst {
				offsetOrders = &OffsetOrders{}
				offsetOrders.BeginOffset = order.Offset
				isFirst = false
			}
			//凑够10个(OffsetOrdersSize)
			if len(offsetOrders.Orders) >= OffsetOrdersSize {
				//凑够10个就发出去
				market.orderCh <- offsetOrders
				//crete new group
				offsetOrders = &OffsetOrders{}
				offsetOrders.BeginOffset = order.Offset
			}
			offsetOrders.Orders = append(offsetOrders.Orders, order)
			offsetOrders.EndOffset = order.Offset
			log.Debugf("provider send orderCh item offsetOrders:%+v, order:%+v", *offsetOrders, *order)
		}

		if offsetOrders != nil && len(offsetOrders.Orders) > 0 {
			//这里来获取的.
			market.orderCh <- offsetOrders
		}
	case rocketmq.PullNoMatchedMsg:
		//case primitive.PullNoMsgMatched:
		market.offset = pr.NextBeginOffset
		log.Errorf("broker PullNoMatchedMsg, offset:%d, pr:%+v", offset, pr)
		return errors.New("broker PullNoMatchedMsg")
	case rocketmq.PullOffsetIllegal:
		//case primitive.PullOffsetIllegal:
		market.offset = pr.NextBeginOffset
		log.Errorf("broker PullOffsetIllegal, offset:%d, pr:%+v", offset, pr)
		return errors.New("broker PullOffsetIllegal")
	case rocketmq.PullBrokerTimeout:
		//case primitive.PullBrokerTimeout:
		log.Errorf("broker timeout occur")
		return errors.New("broker timeout occur")
	}
	market.offset = pr.NextBeginOffset
	maxOffset := pr.MaxOffset
	if (maxOffset - market.offset) > 1000 {
		log.Warnf("MatchMessageBlocked, symbol:%s, blockNum:%d, maxOffset:%d, offset:%d", market.symbol, maxOffset-offset, maxOffset, offset)
	}

	return nil
}
  1. Please tell us about your environment:

    • What is your OS?

    • kubernetes 1.24

    • What is your client version?

    • 1.2.4

    • What is your RocketMQ version?

    • 4.7.1

  2. Other information (e.g. detailed explanation, logs, related issues, suggestions on how to fix, etc):

2023-03-20 22:48:24	
time="2023-03-20T22:48:24+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
2023-03-20 22:48:17	
time="2023-03-20T22:48:17+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
2023-03-20 22:48:09	
time="2023-03-20T22:48:09+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
2023-03-20 22:48:04	
time="2023-03-20T22:48:04+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
2023-03-20 22:46:38	
time="2023-03-20T22:46:38+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
2023-03-20 22:45:29	
time="2023-03-20T22:45:29+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
2023-03-20 22:45:06	
time="2023-03-20T22:45:06+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
2023-03-20 22:43:57	
time="2023-03-20T22:43:57+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
2023-03-20 22:43:41	
time="2023-03-20T22:43:41+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
2023-03-20 22:43:23	
time="2023-03-20T22:43:23+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
2023-03-20 22:40:56	
time="2023-03-20T22:40:56+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
2023-03-20 22:39:34	
time="2023-03-20T22:39:34+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
2023-03-20 22:38:18	
time="2023-03-20T22:38:18+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
2023-03-20 22:36:48	
time="2023-03-20T22:36:48+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
2023-03-20 22:35:01	
time="2023-03-20T22:35:01+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
2023-03-20 22:33:19	
time="2023-03-20T22:33:19+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
2023-03-20 22:32:56	
time="2023-03-20T22:32:56+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
2023-03-20 22:31:15	
time="2023-03-20T22:31:15+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
2023-03-20 22:31:01	
time="2023-03-20T22:31:01+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
2023-03-20 22:30:34	
time="2023-03-20T22:30:34+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
2023-03-20 22:30:13	
time="2023-03-20T22:30:13+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
2023-03-20 22:29:41	
time="2023-03-20T22:29:41+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
2023-03-20 22:29:34	
time="2023-03-20T22:29:34+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
2023-03-20 22:29:16	
time="2023-03-20T22:29:16+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
2023-03-20 22:28:03	
time="2023-03-20T22:28:03+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
2023-03-20 22:27:25	
time="2023-03-20T22:27:25+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
2023-03-20 22:26:58	
time="2023-03-20T22:26:58+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
2023-03-20 22:26:06	
time="2023-03-20T22:26:06+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
2023-03-20 22:25:12	
time="2023-03-20T22:25:12+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
2023-03-20 22:23:47	
time="2023-03-20T22:23:47+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
2023-03-20 22:23:03	
time="2023-03-20T22:23:03+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
2023-03-20 22:22:53	
time="2023-03-20T22:22:53+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
2023-03-20 22:21:53	
time="2023-03-20T22:21:53+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"


2023-03-20 22:21:29	
time="2023-03-20T22:21:29+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
2023-03-20 22:20:13	
time="2023-03-20T22:20:13+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"

kaybinwong avatar Mar 20 '23 15:03 kaybinwong

please upgrade to lastest version, 1.2.4 is still build with CGO

francisoliverlee avatar Apr 24 '23 14:04 francisoliverlee

centos7 1.2.4 The problem still exists

shcw avatar Aug 06 '24 07:08 shcw