pulsar-client-go icon indicating copy to clipboard operation
pulsar-client-go copied to clipboard

[Bug][Producer] The callback was not invoked during reconnecting.

Open gunli opened this issue 10 months ago • 5 comments

Expected behavior

Callback should be called with a timeout error.

Actual behavior

Callback was not called

Steps to reproduce

  1. Connect to the server;
  2. Call SendAsync() to send a message, it will send successfully;
  3. Stop the server;
  4. Call SendAsync() to send some messages, the messages' callback won't be called anymore

System configuration

Pulsar version: x.y

gunli avatar Feb 17 '25 06:02 gunli

I don't know if there is a same bug with the consumer.

It seems that when we call SendAsync() at this time, the messages are sent into p.dataChan, but runEventsLoop() is busy in doing reconnecting: p.reconnectToBroker(connectionClosed), and have no chance to chandle the case case data, ok := <-p.dataChan, and failTimeoutMessages() just check the messages in p.pendingQueue, so no one can tell the messages are timeout.

func (p *partitionProducer) runEventsLoop() {
	for {
		select {
		case data, ok := <-p.dataChan:
			// when doClose() is call, p.dataChan will be closed, data will be nil
			if !ok {
				return
			}
			p.internalSend(data)
		case cmd, ok := <-p.cmdChan:
			// when doClose() is call, p.dataChan will be closed, cmd will be nil
			if !ok {
				return
			}
			switch v := cmd.(type) {
			case *flushRequest:
				p.internalFlush(v)
			case *closeProducer:
				p.internalClose(v)
				return
			}
		case connectionClosed := <-p.connectClosedCh:
			p.log.Info("runEventsLoop will reconnect in producer")
			p.reconnectToBroker(connectionClosed)
		case <-p.batchFlushTicker.C:
			p.internalFlushCurrentBatch()
		}
	}
}

func (p *partitionProducer) failTimeoutMessages() {
	diff := func(sentAt time.Time) time.Duration {
		return p.options.SendTimeout - time.Since(sentAt)
	}

	t := time.NewTimer(p.options.SendTimeout)
	defer t.Stop()

	for range t.C {
		state := p.getProducerState()
		if state == producerClosing || state == producerClosed {
			return
		}

		item := p.pendingQueue.Peek()
		if item == nil {
			// pending queue is empty
			t.Reset(p.options.SendTimeout)
			continue
		}
		oldestItem := item.(*pendingItem)
		if nextWaiting := diff(oldestItem.createdAt); nextWaiting > 0 {
			// none of these pending messages have timed out, wait and retry
			t.Reset(nextWaiting)
			continue
		}

		// since pending queue is not thread safe because of there is no global iteration lock
		// to control poll from pending queue, current goroutine and connection receipt handler
		// iterate pending queue at the same time, this maybe a performance trade-off
		// see https://github.com/apache/pulsar-client-go/pull/301
		curViewItems := p.pendingQueue.ReadableSlice()
		viewSize := len(curViewItems)
		if viewSize <= 0 {
			// double check
			t.Reset(p.options.SendTimeout)
			continue
		}
		p.log.Infof("Failing %d messages on timeout %s", viewSize, p.options.SendTimeout)
		lastViewItem := curViewItems[viewSize-1].(*pendingItem)

		// iterate at most viewSize items
		for i := 0; i < viewSize; i++ {
			tickerNeedWaiting := time.Duration(0)
			item := p.pendingQueue.CompareAndPoll(
				func(m interface{}) bool {
					if m == nil {
						return false
					}

					pi := m.(*pendingItem)
					pi.Lock()
					defer pi.Unlock()
					if nextWaiting := diff(pi.createdAt); nextWaiting > 0 {
						// current and subsequent items not timeout yet, stop iterating
						tickerNeedWaiting = nextWaiting
						return false
					}
					return true
				})

			if item == nil {
				t.Reset(p.options.SendTimeout)
				break
			}

			if tickerNeedWaiting > 0 {
				t.Reset(tickerNeedWaiting)
				break
			}

			pi := item.(*pendingItem)
			pi.Lock()

			for _, i := range pi.sendRequests {
				sr := i.(*sendRequest)
				sr.done(nil, ErrSendTimeout)
			}

			// flag the sending has completed with error, flush make no effect
			pi.done(ErrSendTimeout)
			pi.Unlock()

			// finally reached the last view item, current iteration ends
			if pi == lastViewItem {
				t.Reset(p.options.SendTimeout)
				break
			}
		}
	}
}

gunli avatar Feb 17 '25 06:02 gunli

@shibd @nodece @RobertIndie @BewareMyPower @crossoverJie

gunli avatar Feb 17 '25 06:02 gunli

We have discussed whether we should block the send and flush requests when we are reconnecting in #1249, but not sufficient enough. Now, IMO, we should not block send/flush when we are reconnecting, 'cause it has no side effect, send/flush will succeed or timeout, if we block them, as this bug described, timeout will not triggered.

gunli avatar Feb 17 '25 08:02 gunli

What's your client and producer configs? There was a similar issue happened when maxReconnectAttempts is 1: https://github.com/apache/pulsar-client-go/issues/1312#issuecomment-2597791188

BewareMyPower avatar Feb 17 '25 08:02 BewareMyPower

What's your client and producer configs? There was a similar issue happened when maxReconnectAttempts is 1: #1312 (comment)

My maxReconnectAttempts is the default value: -1

gunli avatar Feb 18 '25 01:02 gunli