watermill icon indicating copy to clipboard operation
watermill copied to clipboard

Blocking in GoChannel `sendMessageToSubscriber`

Open qiulin opened this issue 1 year ago • 0 comments

My use case is reading kafka message using segment-io kafka driver and publishing to watermill using GoChannel. One day I got a kafka topic blocking. After read watermill code, I found it blocked at case <-msgToSend.Acked():, but logs show msg.Ack() is ok, and "Message acked" in handler.handleMessage was printed.

func (s *subscriber) sendMessageToSubscriber(msg *message.Message, logFields watermill.LogFields) {
	s.sending.Lock()
	defer s.sending.Unlock()

	ctx, cancelCtx := context.WithCancel(s.ctx)
	defer cancelCtx()

SendToSubscriber:
	for {
		// copy the message to prevent ack/nack propagation to other consumers
		// also allows to make retries on a fresh copy of the original message
		msgToSend := msg.Copy()
		msgToSend.SetContext(ctx)

		s.logger.Trace("Sending msg to subscriber", logFields)

		if s.closed {
			s.logger.Info("Pub/Sub closed, discarding msg", logFields)
			return
		}

		select {
		case s.outputChannel <- msgToSend:
			s.logger.Trace("Sent message to subscriber", logFields)
		case <-s.closing:
			s.logger.Trace("Closing, message discarded", logFields)
			return
		}

		select {
		case <-msgToSend.Acked():
			s.logger.Trace("Message acked", logFields)
			return
		case <-msgToSend.Nacked():
			s.logger.Trace("Nack received, resending message", logFields)
			continue SendToSubscriber
		case <-s.closing:
			s.logger.Trace("Closing, message discarded", logFields)
			return
		}
	}
}

qiulin avatar Sep 04 '23 08:09 qiulin