watermill icon indicating copy to clipboard operation
watermill copied to clipboard

The publish signature does not allow for a context.Context to be passed

Open advdv opened this issue 1 year ago • 2 comments
trafficstars

I was evaluating watermill today and I was surprised to learn that the Publish signature does not take a context.Context. Without it, how does one cancel a publish that may be stuck on a slow/failing network request?

Let's say I my app takes a http request with a timeout of 3 seconds. In handling this request I want to publish a message to a Redis stream using a Watermill publisher. Unfortunately the Redis instance has some issues and the network is blocked and it takes 4 seconds for Redis to answer. How can I make sure the publish get's canceled?

The issue is also apparent in the redisstream source code:

// Publish publishes message to redis stream
//
// Publish is blocking and waits for redis response.
// When any of messages delivery fails - function is interrupted.
func (p *Publisher) Publish(topic string, msgs ...*message.Message) error {
	if p.closed {
		return errors.New("publisher closed")
	}

	logFields := make(watermill.LogFields, 3)
	logFields["topic"] = topic

	for _, msg := range msgs {
		logFields["message_uuid"] = msg.UUID
		p.logger.Trace("Sending message to redis stream", logFields)

		values, err := p.config.Marshaller.Marshal(topic, msg)
		if err != nil {
			return errors.Wrapf(err, "cannot marshal message %s", msg.UUID)
		}

		maxlen, ok := p.config.Maxlens[topic]
		if !ok {
			maxlen = p.config.DefaultMaxlen
		}
                
                // HERE: The library needs to use "context.Background" which means this XADD can hang
                // forever on network issues.
		id, err := p.client.XAdd(context.Background(), &redis.XAddArgs{
			Stream: topic,
			Values: values,
			MaxLen: maxlen,
			Approx: true,
		}).Result()
		if err != nil {
			return errors.Wrapf(err, "cannot xadd message %s", msg.UUID)
		}

		logFields["xadd_id"] = id
		p.logger.Trace("Message sent to redis stream", logFields)
	}

	return nil
}

advdv avatar May 29 '24 04:05 advdv

Hey @advdv. The publish doesn't take context because it's attached to the message. See the message.SetContext() method.

You are right about redisstream, though. We probably should review all Pub/Subs. For example, watermill-amqp does it correctly: https://github.com/ThreeDotsLabs/watermill-amqp/blob/master/pkg/amqp/publisher.go#L191

m110 avatar Jun 28 '24 08:06 m110

Right, thank you for the response. Too bad I didn't notice this, I remember looking at the NATS and Redis implementation to check how is done. Guess I picked the wrong ones. I think maybe the Kafka producer has the same issue, but it is more complicated since it uses the Sarama sync producer which has no context support also. (I think, not a user, but I found: https://github.com/IBM/sarama/issues/1849).

I guess some of these libraries are old and pre-date the wide-spread use of context.Context? Not sure if it's actually feasible to do anything about this in the short term.

advdv avatar Jun 28 '24 08:06 advdv