rocketmq-client-go
rocketmq-client-go copied to clipboard
PullBrokerTimeout happed frequently when pulling msg
BUG REPORT
- Please describe the issue you observed:
pulling msg with 1.2.4, but got PullBrokerTimeout error, it did't hanppend with spring client.
……
err := p.pullMQ(market)
if err != nil {
log.Warnf("StartConsume pullMQ fail, symbol: %s, err: %v, market:%+v", symbol, err, *market)
time.Sleep(ERRSLEEP)
}
……
// 通过pull用offset的方式来避免重复消费
func (p *rocketMQProvider) pullMQ(market *Market) error {
symbol := market.symbol
topic := TopicNameOrderProvider(symbol)
//----start---------------
mqs := p.consumer.FetchSubscriptionMessageQueues(topic)
//mqs := p.consumer.MessageQueues(topic)
//----end---------------
log.Debugf("StartConsume fetch subscription topic:%s, mqs:%v", topic, mqs)
if len(mqs) != 1 {
//为了确保顺序性,不能用多个queue
return errors.New(fmt.Sprintf("topic queue num error, topic:%s, num: %d", topic, len(mqs)))
}
mq := mqs[0]
offset := market.offset
//---------------start-----------------
pr := p.consumer.Pull(mq, "*", offset, 800)
//pr, _ := p.consumer.PullFrom(context.TODO(), mq, offset, 800)
//---------------end-----------------
log.Debugf("Leader pull after, offset: %d, mq: %s, result: %+v", offset, mq.String(), pr)
switch pr.Status {
case rocketmq.PullNoNewMsg:
//case primitive.PullNoNewMsg:
log.Debugf("rocketmq pull nonewmasg, market:%+v", market)
time.Sleep(time.Millisecond * time.Duration(p.noMsgSleep))
return nil
case rocketmq.PullFound:
//case primitive.PullFound:
var offsetOrders *OffsetOrders
isFirst := true
//for _, msg := range pr.GetMessageExts() {
for _, msg := range pr.Messages {
order, err := PbToOrder(msg)
if err != nil {
log.Warnf("rocketmq formatOrder failed, offset:%d, err: %v", msg.QueueOffset, err)
continue
}
if isFirst {
offsetOrders = &OffsetOrders{}
offsetOrders.BeginOffset = order.Offset
isFirst = false
}
//凑够10个(OffsetOrdersSize)
if len(offsetOrders.Orders) >= OffsetOrdersSize {
//凑够10个就发出去
market.orderCh <- offsetOrders
//crete new group
offsetOrders = &OffsetOrders{}
offsetOrders.BeginOffset = order.Offset
}
offsetOrders.Orders = append(offsetOrders.Orders, order)
offsetOrders.EndOffset = order.Offset
log.Debugf("provider send orderCh item offsetOrders:%+v, order:%+v", *offsetOrders, *order)
}
if offsetOrders != nil && len(offsetOrders.Orders) > 0 {
//这里来获取的.
market.orderCh <- offsetOrders
}
case rocketmq.PullNoMatchedMsg:
//case primitive.PullNoMsgMatched:
market.offset = pr.NextBeginOffset
log.Errorf("broker PullNoMatchedMsg, offset:%d, pr:%+v", offset, pr)
return errors.New("broker PullNoMatchedMsg")
case rocketmq.PullOffsetIllegal:
//case primitive.PullOffsetIllegal:
market.offset = pr.NextBeginOffset
log.Errorf("broker PullOffsetIllegal, offset:%d, pr:%+v", offset, pr)
return errors.New("broker PullOffsetIllegal")
case rocketmq.PullBrokerTimeout:
//case primitive.PullBrokerTimeout:
log.Errorf("broker timeout occur")
return errors.New("broker timeout occur")
}
market.offset = pr.NextBeginOffset
maxOffset := pr.MaxOffset
if (maxOffset - market.offset) > 1000 {
log.Warnf("MatchMessageBlocked, symbol:%s, blockNum:%d, maxOffset:%d, offset:%d", market.symbol, maxOffset-offset, maxOffset, offset)
}
return nil
}
-
Please tell us about your environment:
-
What is your OS?
-
kubernetes 1.24
-
What is your client version?
-
1.2.4
-
What is your RocketMQ version?
-
4.7.1
-
-
Other information (e.g. detailed explanation, logs, related issues, suggestions on how to fix, etc):
2023-03-20 22:48:24
time="2023-03-20T22:48:24+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
2023-03-20 22:48:17
time="2023-03-20T22:48:17+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
2023-03-20 22:48:09
time="2023-03-20T22:48:09+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
2023-03-20 22:48:04
time="2023-03-20T22:48:04+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
2023-03-20 22:46:38
time="2023-03-20T22:46:38+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
2023-03-20 22:45:29
time="2023-03-20T22:45:29+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
2023-03-20 22:45:06
time="2023-03-20T22:45:06+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
2023-03-20 22:43:57
time="2023-03-20T22:43:57+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
2023-03-20 22:43:41
time="2023-03-20T22:43:41+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
2023-03-20 22:43:23
time="2023-03-20T22:43:23+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
2023-03-20 22:40:56
time="2023-03-20T22:40:56+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
2023-03-20 22:39:34
time="2023-03-20T22:39:34+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
2023-03-20 22:38:18
time="2023-03-20T22:38:18+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
2023-03-20 22:36:48
time="2023-03-20T22:36:48+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
2023-03-20 22:35:01
time="2023-03-20T22:35:01+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
2023-03-20 22:33:19
time="2023-03-20T22:33:19+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
2023-03-20 22:32:56
time="2023-03-20T22:32:56+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
2023-03-20 22:31:15
time="2023-03-20T22:31:15+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
2023-03-20 22:31:01
time="2023-03-20T22:31:01+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
2023-03-20 22:30:34
time="2023-03-20T22:30:34+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
2023-03-20 22:30:13
time="2023-03-20T22:30:13+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
2023-03-20 22:29:41
time="2023-03-20T22:29:41+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
2023-03-20 22:29:34
time="2023-03-20T22:29:34+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
2023-03-20 22:29:16
time="2023-03-20T22:29:16+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
2023-03-20 22:28:03
time="2023-03-20T22:28:03+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
2023-03-20 22:27:25
time="2023-03-20T22:27:25+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
2023-03-20 22:26:58
time="2023-03-20T22:26:58+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
2023-03-20 22:26:06
time="2023-03-20T22:26:06+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
2023-03-20 22:25:12
time="2023-03-20T22:25:12+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
2023-03-20 22:23:47
time="2023-03-20T22:23:47+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
2023-03-20 22:23:03
time="2023-03-20T22:23:03+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
2023-03-20 22:22:53
time="2023-03-20T22:22:53+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
2023-03-20 22:21:53
time="2023-03-20T22:21:53+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
2023-03-20 22:21:29
time="2023-03-20T22:21:29+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
2023-03-20 22:20:13
time="2023-03-20T22:20:13+08:00" level=warning msg="StartConsume pullMQ fail, symbol: E_BTCUSDT, err: broker timeout occur, market:{id:1 symbol:E_BTCUSDT orderCh:0xc00091a9c0 logCh:0xc00091aa20 isPullMQ:true offset:990799 isRunning:true cancel:0x525e80 ready:true mu:{state:0 sema:0} consumeCmdChan:0xc00091aa80 a:map[]}"
please upgrade to lastest version, 1.2.4 is still build with CGO
centos7 1.2.4 The problem still exists