[Bug][Producer] The callback was not invoked during reconnecting.
Expected behavior
Callback should be called with a timeout error.
Actual behavior
Callback was not called
Steps to reproduce
- Connect to the server;
- Call SendAsync() to send a message, it will send successfully;
- Stop the server;
- Call SendAsync() to send some messages, the messages' callback won't be called anymore
System configuration
Pulsar version: x.y
I don't know if there is a same bug with the consumer.
It seems that when we call SendAsync() at this time, the messages are sent into p.dataChan, but runEventsLoop() is busy in doing reconnecting: p.reconnectToBroker(connectionClosed), and have no chance to chandle the case case data, ok := <-p.dataChan, and failTimeoutMessages() just check the messages in p.pendingQueue, so no one can tell the messages are timeout.
func (p *partitionProducer) runEventsLoop() {
for {
select {
case data, ok := <-p.dataChan:
// when doClose() is call, p.dataChan will be closed, data will be nil
if !ok {
return
}
p.internalSend(data)
case cmd, ok := <-p.cmdChan:
// when doClose() is call, p.dataChan will be closed, cmd will be nil
if !ok {
return
}
switch v := cmd.(type) {
case *flushRequest:
p.internalFlush(v)
case *closeProducer:
p.internalClose(v)
return
}
case connectionClosed := <-p.connectClosedCh:
p.log.Info("runEventsLoop will reconnect in producer")
p.reconnectToBroker(connectionClosed)
case <-p.batchFlushTicker.C:
p.internalFlushCurrentBatch()
}
}
}
func (p *partitionProducer) failTimeoutMessages() {
diff := func(sentAt time.Time) time.Duration {
return p.options.SendTimeout - time.Since(sentAt)
}
t := time.NewTimer(p.options.SendTimeout)
defer t.Stop()
for range t.C {
state := p.getProducerState()
if state == producerClosing || state == producerClosed {
return
}
item := p.pendingQueue.Peek()
if item == nil {
// pending queue is empty
t.Reset(p.options.SendTimeout)
continue
}
oldestItem := item.(*pendingItem)
if nextWaiting := diff(oldestItem.createdAt); nextWaiting > 0 {
// none of these pending messages have timed out, wait and retry
t.Reset(nextWaiting)
continue
}
// since pending queue is not thread safe because of there is no global iteration lock
// to control poll from pending queue, current goroutine and connection receipt handler
// iterate pending queue at the same time, this maybe a performance trade-off
// see https://github.com/apache/pulsar-client-go/pull/301
curViewItems := p.pendingQueue.ReadableSlice()
viewSize := len(curViewItems)
if viewSize <= 0 {
// double check
t.Reset(p.options.SendTimeout)
continue
}
p.log.Infof("Failing %d messages on timeout %s", viewSize, p.options.SendTimeout)
lastViewItem := curViewItems[viewSize-1].(*pendingItem)
// iterate at most viewSize items
for i := 0; i < viewSize; i++ {
tickerNeedWaiting := time.Duration(0)
item := p.pendingQueue.CompareAndPoll(
func(m interface{}) bool {
if m == nil {
return false
}
pi := m.(*pendingItem)
pi.Lock()
defer pi.Unlock()
if nextWaiting := diff(pi.createdAt); nextWaiting > 0 {
// current and subsequent items not timeout yet, stop iterating
tickerNeedWaiting = nextWaiting
return false
}
return true
})
if item == nil {
t.Reset(p.options.SendTimeout)
break
}
if tickerNeedWaiting > 0 {
t.Reset(tickerNeedWaiting)
break
}
pi := item.(*pendingItem)
pi.Lock()
for _, i := range pi.sendRequests {
sr := i.(*sendRequest)
sr.done(nil, ErrSendTimeout)
}
// flag the sending has completed with error, flush make no effect
pi.done(ErrSendTimeout)
pi.Unlock()
// finally reached the last view item, current iteration ends
if pi == lastViewItem {
t.Reset(p.options.SendTimeout)
break
}
}
}
}
@shibd @nodece @RobertIndie @BewareMyPower @crossoverJie
We have discussed whether we should block the send and flush requests when we are reconnecting in #1249, but not sufficient enough. Now, IMO, we should not block send/flush when we are reconnecting, 'cause it has no side effect, send/flush will succeed or timeout, if we block them, as this bug described, timeout will not triggered.
What's your client and producer configs? There was a similar issue happened when maxReconnectAttempts is 1: https://github.com/apache/pulsar-client-go/issues/1312#issuecomment-2597791188
What's your client and producer configs? There was a similar issue happened when
maxReconnectAttemptsis 1: #1312 (comment)
My maxReconnectAttempts is the default value: -1