watermill
watermill copied to clipboard
Context timeout ignored in Watermill's Google PubSub publisher due to premature `Ready()` blocking
Steps to reproduce
package main
import (
"time"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-googlecloud/pkg/googlecloud"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/google/uuid"
"google.golang.org/api/option"
)
func main() {
opts := []option.ClientOption{
option.WithCredentialsJSON([]byte("<replace-credentials>")),
}
cfg := googlecloud.PublisherConfig{
Marshaler: googlecloud.DefaultMarshalerUnmarshaler{},
ProjectID: "<replace-project-id>",
ClientOptions: opts,
DoNotCreateTopicIfMissing: false,
DoNotCheckTopicExistence: true,
PublishTimeout: 1 * time.Nanosecond, // Simulate context cancellation by setting a very low timeout
}
publisher, err := googlecloud.NewPublisher(cfg, watermill.NopLogger{})
if err != nil {
panic(err)
}
payload := `{"message": "hello world!"}`
msgId := uuid.New().String()
publisher.Publish("<replace-topic-id>", message.NewMessage(msgId, message.Payload(payload)))
}
Expected behavior
- The underlying publisher should return an error when the provided context times out.
Actual behavior
- The publish timeout is not respected — the publisher blocks indefinitely, even after exceeding the context deadline.
Possible solution
The root cause lies in the following lines:
https://github.com/ThreeDotsLabs/watermill-googlecloud/blob/4a22a44b6cdbe7c270cb1e0fc3355f97141645ee/pkg/googlecloud/publisher.go#L166C1-L169C42
result := t.Publish(ctx, googlecloudMsg)
<-result.Ready()
serverMessageID, err := result.Get(ctx)
We're blocking on <-result.Ready() before calling Get(ctx). According to the Get implementation in the Google Cloud Pub/Sub client:
If the result is already ready, Get returns immediately — even if the context is already done.
Since we're explicitly waiting on Ready() before calling Get, the context is essentially ignored, and Get(ctx) will always return the result, regardless of context timeout.
A potential fix is to remove the <-result.Ready() line, allowing Get(ctx) to handle both Ready and respecting the context deadline.