watermill
watermill copied to clipboard
Blocking in GoChannel `sendMessageToSubscriber`
My use case is reading kafka message using segment-io kafka driver and publishing to watermill using GoChannel. One day I got a kafka topic blocking. After read watermill code, I found it blocked at case <-msgToSend.Acked():
, but logs show msg.Ack()
is ok, and "Message acked" in handler.handleMessage
was printed.
func (s *subscriber) sendMessageToSubscriber(msg *message.Message, logFields watermill.LogFields) {
s.sending.Lock()
defer s.sending.Unlock()
ctx, cancelCtx := context.WithCancel(s.ctx)
defer cancelCtx()
SendToSubscriber:
for {
// copy the message to prevent ack/nack propagation to other consumers
// also allows to make retries on a fresh copy of the original message
msgToSend := msg.Copy()
msgToSend.SetContext(ctx)
s.logger.Trace("Sending msg to subscriber", logFields)
if s.closed {
s.logger.Info("Pub/Sub closed, discarding msg", logFields)
return
}
select {
case s.outputChannel <- msgToSend:
s.logger.Trace("Sent message to subscriber", logFields)
case <-s.closing:
s.logger.Trace("Closing, message discarded", logFields)
return
}
select {
case <-msgToSend.Acked():
s.logger.Trace("Message acked", logFields)
return
case <-msgToSend.Nacked():
s.logger.Trace("Nack received, resending message", logFields)
continue SendToSubscriber
case <-s.closing:
s.logger.Trace("Closing, message discarded", logFields)
return
}
}
}