watermill
watermill copied to clipboard
Replace original context in pub/sub GoChannel (SendToSubscriber)
Hi you have replaced original msg context with msgToSend.SetContext(ctx) and we lost the original context! so to speak :
when we call msg.Copy() we don't set original context on new msg
func (s *subscriber) sendMessageToSubscriber(msg *message.Message, logFields watermill.LogFields) {
s.sending.Lock()
defer s.sending.Unlock()
ctx, cancelCtx := context.WithCancel(s.ctx)
defer cancelCtx()
SendToSubscriber:
for {
// copy the message to prevent ack/nack propagation to other consumers
// also allows to make retries on a fresh copy of the original message
msgToSend := msg.Copy()
msgToSend.SetContext(ctx)
s.logger.Trace("Sending msg to subscriber", logFields)
if s.closed {
s.logger.Info("Pub/Sub closed, discarding msg", logFields)
return
}
select {
case s.outputChannel <- msgToSend:
s.logger.Trace("Sent message to subscriber", logFields)
case <-s.closing:
s.logger.Trace("Closing, message discarded", logFields)
return
}
select {
case <-msgToSend.Acked():
s.logger.Trace("Message acked", logFields)
return
case <-msgToSend.Nacked():
s.logger.Trace("Nack received, resending message", logFields)
continue SendToSubscriber
case <-s.closing:
s.logger.Trace("Closing, message discarded", logFields)
return
}
}
}
Hey @bi0dread. This is to maintain the same behavior as other Pubs/Subs. Other backends transfer the message over the network, so they can't keep the context. I guess we could make it an option in the config to keep the context, though, as a special case.
@m110 This issue is what I am also facing. At least in the local pub-sub case, the context must be propagated (even though it doesn't match the behaviour of other pub-subs.
Let me know if you want me to add this feature and push.
Hey @yashb042. I think it makes sense, just please keep it as a config option with proper disclaimer that it doesn't match the generic behavior. :)
Okay, picking this up
https://github.com/ThreeDotsLabs/watermill/pull/487/files
Adding test cases soon Please let me know if the variable addition is okay. Have not added extra variables in Message struct, because the Message struct is used by every publisher-subscriber
Added test cases as well, please review.
Couldn't test the func - TestMessageCtx in test_pubsub.go It's mentioned that "ExactlyOnceDelivery test is not supported yet"
Please let me know if anything to be done there
Thanks everyone, this is now merged!