watermill icon indicating copy to clipboard operation
watermill copied to clipboard

Replace original context in pub/sub GoChannel (SendToSubscriber)

Open bi0dread opened this issue 1 year ago • 6 comments

Hi you have replaced original msg context with msgToSend.SetContext(ctx) and we lost the original context! so to speak :

when we call msg.Copy() we don't set original context on new msg

func (s *subscriber) sendMessageToSubscriber(msg *message.Message, logFields watermill.LogFields) {
	s.sending.Lock()
	defer s.sending.Unlock()

	ctx, cancelCtx := context.WithCancel(s.ctx)
	defer cancelCtx()

SendToSubscriber:
	for {
		// copy the message to prevent ack/nack propagation to other consumers
		// also allows to make retries on a fresh copy of the original message
		msgToSend := msg.Copy()
		msgToSend.SetContext(ctx)

		s.logger.Trace("Sending msg to subscriber", logFields)

		if s.closed {
			s.logger.Info("Pub/Sub closed, discarding msg", logFields)
			return
		}

		select {
		case s.outputChannel <- msgToSend:
			s.logger.Trace("Sent message to subscriber", logFields)
		case <-s.closing:
			s.logger.Trace("Closing, message discarded", logFields)
			return
		}

		select {
		case <-msgToSend.Acked():
			s.logger.Trace("Message acked", logFields)
			return
		case <-msgToSend.Nacked():
			s.logger.Trace("Nack received, resending message", logFields)
			continue SendToSubscriber
		case <-s.closing:
			s.logger.Trace("Closing, message discarded", logFields)
			return
		}
	}
}

bi0dread avatar Aug 05 '24 08:08 bi0dread

Hey @bi0dread. This is to maintain the same behavior as other Pubs/Subs. Other backends transfer the message over the network, so they can't keep the context. I guess we could make it an option in the config to keep the context, though, as a special case.

m110 avatar Aug 12 '24 12:08 m110

@m110 This issue is what I am also facing. At least in the local pub-sub case, the context must be propagated (even though it doesn't match the behaviour of other pub-subs.

Let me know if you want me to add this feature and push.

yashb042 avatar Aug 29 '24 05:08 yashb042

Hey @yashb042. I think it makes sense, just please keep it as a config option with proper disclaimer that it doesn't match the generic behavior. :)

m110 avatar Aug 29 '24 06:08 m110

Okay, picking this up

yashb042 avatar Sep 03 '24 05:09 yashb042

https://github.com/ThreeDotsLabs/watermill/pull/487/files

Adding test cases soon Please let me know if the variable addition is okay. Have not added extra variables in Message struct, because the Message struct is used by every publisher-subscriber

yashb042 avatar Sep 03 '24 06:09 yashb042

Added test cases as well, please review.

Couldn't test the func - TestMessageCtx in test_pubsub.go It's mentioned that "ExactlyOnceDelivery test is not supported yet"

Please let me know if anything to be done there

yashb042 avatar Sep 04 '24 05:09 yashb042

Thanks everyone, this is now merged!

m110 avatar Aug 25 '25 14:08 m110