watermill
watermill copied to clipboard
The publish signature does not allow for a context.Context to be passed
I was evaluating watermill today and I was surprised to learn that the Publish signature does not take a context.Context. Without it, how does one cancel a publish that may be stuck on a slow/failing network request?
Let's say I my app takes a http request with a timeout of 3 seconds. In handling this request I want to publish a message to a Redis stream using a Watermill publisher. Unfortunately the Redis instance has some issues and the network is blocked and it takes 4 seconds for Redis to answer. How can I make sure the publish get's canceled?
The issue is also apparent in the redisstream source code:
// Publish publishes message to redis stream
//
// Publish is blocking and waits for redis response.
// When any of messages delivery fails - function is interrupted.
func (p *Publisher) Publish(topic string, msgs ...*message.Message) error {
if p.closed {
return errors.New("publisher closed")
}
logFields := make(watermill.LogFields, 3)
logFields["topic"] = topic
for _, msg := range msgs {
logFields["message_uuid"] = msg.UUID
p.logger.Trace("Sending message to redis stream", logFields)
values, err := p.config.Marshaller.Marshal(topic, msg)
if err != nil {
return errors.Wrapf(err, "cannot marshal message %s", msg.UUID)
}
maxlen, ok := p.config.Maxlens[topic]
if !ok {
maxlen = p.config.DefaultMaxlen
}
// HERE: The library needs to use "context.Background" which means this XADD can hang
// forever on network issues.
id, err := p.client.XAdd(context.Background(), &redis.XAddArgs{
Stream: topic,
Values: values,
MaxLen: maxlen,
Approx: true,
}).Result()
if err != nil {
return errors.Wrapf(err, "cannot xadd message %s", msg.UUID)
}
logFields["xadd_id"] = id
p.logger.Trace("Message sent to redis stream", logFields)
}
return nil
}
Hey @advdv. The publish doesn't take context because it's attached to the message. See the message.SetContext() method.
You are right about redisstream, though. We probably should review all Pub/Subs. For example, watermill-amqp does it correctly: https://github.com/ThreeDotsLabs/watermill-amqp/blob/master/pkg/amqp/publisher.go#L191
Right, thank you for the response. Too bad I didn't notice this, I remember looking at the NATS and Redis implementation to check how is done. Guess I picked the wrong ones. I think maybe the Kafka producer has the same issue, but it is more complicated since it uses the Sarama sync producer which has no context support also. (I think, not a user, but I found: https://github.com/IBM/sarama/issues/1849).
I guess some of these libraries are old and pre-date the wide-spread use of context.Context? Not sure if it's actually feasible to do anything about this in the short term.