watermill
watermill copied to clipboard
Context Cancellation During Router Shutdown Causes In-flight Handlers cleanup (Ack/Nack/DeleteConsumer) to Fail
Concurrency Issue: Context Cancellation During Router Shutdown Causes In-flight Handlers to Fail
Description
There appears to be a race condition in the router shutdown process that causes in-flight handlers to fail with "context canceled" errors. The problem occurs because the parent context is being canceled before handlers have completed their work.
Current Behavior
When Router.Close() is called:
- The
closingInProgressChchannel is closed - This triggers the
Run()method to immediately callcancel()on the parent context - Only after canceling the context does
Close()attempt to wait for handlers to finish viawaitForHandlers()
This premature context cancellation causes any in-flight handlers with cleanup code (Ack/Nack etc) that respect context cancellation (such as network operations) to fail with "context canceled" errors, particularly with libraries like Jetstream. This is mostly for the post handler clean up code (such as sending Ack/Nack messages over network or deleting a consumer).
Code Analysis
In message/router.go:
// In Router.Close()
close(r.closingInProgressCh)
defer close(r.closedCh)
timedout := r.waitForHandlers() // By this point, the context is already canceled
// In Router.Run()
<-r.closingInProgressCh // Waits for closing signal
cancel() // Immediately cancels context before handlers complete
// ...later
<-r.closedCh // Waits for complete shutdown
Suggested Solution
The context cancellation should happen after handlers have completed their work, not before. Options:
- Move the
cancel()call inRun()to after waiting for theclosedChsignal, which would indicate handlers have finished - Implement a more coordinated shutdown where handlers are allowed to complete before context cancellation
- Use a separate context for shutdown operations that doesn't affect in-flight work
Reproduction
Here is a full copy pasteable reproducible example:
package main
import (
"context"
"fmt"
wmjetstream "github.com/ThreeDotsLabs/watermill-nats/v2/pkg/jetstream"
"github.com/nats-io/nats.go/jetstream"
"time"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/google/uuid"
"github.com/nats-io/nats.go"
)
func main() {
ctx := context.Background()
logger := watermill.NewStdLogger(true, true)
// Create a new router
r, err := message.NewRouter(message.RouterConfig{}, logger)
if err != nil {
panic(err)
}
// Inline connectToNats logic
url := "tls://demo.nats.io"
nc, err := nats.Connect(url)
if err != nil {
panic(err)
}
js, err := jetstream.New(nc)
if err != nil {
panic(err)
}
stream, err := js.CreateStream(ctx, jetstream.StreamConfig{Name: "demoevents"})
if err != nil {
panic(err)
}
_ = stream
// Create publisher and subscriber
pub, err := wmjetstream.NewPublisher(wmjetstream.PublisherConfig{Conn: nc})
if err != nil {
panic(err)
}
sub, err := wmjetstream.NewSubscriber(wmjetstream.SubscriberConfig{Conn: nc})
if err != nil {
panic(err)
}
// Set up message handling
r.AddNoPublisherHandler("natsJsHandler", "demoevents", sub, func(msg *message.Message) error {
fmt.Println("natsJsHandler starting", string(msg.Payload))
time.Sleep(3 * time.Second)
fmt.Println("natsJsHandler finished", string(msg.Payload))
return nil
})
// Run router in goroutine
go func() {
err := r.Run(ctx)
if err != nil {
panic(err)
}
}()
// Give the router time to initialize
time.Sleep(1 * time.Second)
// Publish message
err = pub.Publish("demoevents", message.NewMessage(uuid.NewString(), []byte("hello world")))
if err != nil {
panic(err)
}
// Wait for message handling to complete
// Close router
err = r.Close()
if err != nil {
panic(err)
}
}
Output
[watermill] 2025/05/16 13:26:12.433111 router.go:280: level=INFO msg="Adding handler" handler_name=natsJsHandler topic=demoevents [watermill] 2025/05/16 13:26:12.433507 router.go:375: level=DEBUG msg="Loading plugins" [watermill] 2025/05/16 13:26:12.433582 router.go:414: level=INFO msg="Running router handlers" count=1 [watermill] 2025/05/16 13:26:12.433597 router.go:436: level=DEBUG msg="Subscribing to topic" subscriber_name=natsJsHandler topic=demoevents [watermill] 2025/05/16 13:26:12.689864 router.go:628: level=INFO msg="Starting handler" subscriber_name=natsJsHandler topic=demoevents [watermill] 2025/05/16 13:26:12.816366 message_handler.go:27: level=INFO msg="got msg" ID=227fc7cf-9f1c-4e1a-8e6a-03ae18dddf4f Len=11 ReceivedSubject=demoevents [watermill] 2025/05/16 13:26:12.816485 router.go:802: level=TRACE msg="Received message" message_uuid=227fc7cf-9f1c-4e1a-8e6a-03ae18dddf4f natsJsHandler starting hello world [watermill] 2025/05/16 13:26:13.561223 router.go:557: level=DEBUG msg="Running Close()" [watermill] 2025/05/16 13:26:13.561304 router.go:560: level=INFO msg="Closing router" natsJsHandler finished hello world [watermill] 2025/05/16 13:26:16.214484 router.go:776: level=DEBUG msg="Waiting for subscriber to close" [watermill] 2025/05/16 13:26:17.515390 router.go:820: level=TRACE msg="Message acked" message_uuid=227fc7cf-9f1c-4e1a-8e6a-03ae18dddf4f [watermill] 2025/05/16 13:26:17.515423 router.go:393: level=INFO msg="Waiting for messages" timeout=30s [watermill] 2025/05/16 13:26:32.572636 router.go:778: level=ERROR msg="Failed to close subscriber" err="output wait group did not finish within alloted 5s" [watermill] 2025/05/16 13:26:32.572705 message_handler.go:65: level=ERROR msg="Cannot send ack" ID=227fc7cf-9f1c-4e1a-8e6a-03ae18dddf4f Len=11 ReceivedSubject=demoevents err="context canceled" [watermill] 2025/05/16 13:26:32.572717 router.go:780: level=DEBUG msg="Subscriber closed" [watermill] 2025/05/16 13:26:32.572732 router.go:654: level=DEBUG msg="Waiting for publisher to close" [watermill] 2025/05/16 13:26:32.572742 router.go:658: level=DEBUG msg="Publisher closed" [watermill] 2025/05/16 13:26:32.572748 router.go:661: level=DEBUG msg="Router handler stopped" [watermill] 2025/05/16 13:26:32.572755 message_handler.go:27: level=INFO msg="got msg" ID=1093f1ee-2b11-4d9e-a216-226c591b872e Len=11 ReceivedSubject=demoevents [watermill] 2025/05/16 13:26:32.572760 router.go:463: level=INFO msg="Subscriber stopped" subscriber_name=natsJsHandler topic=demoevents [watermill] 2025/05/16 13:26:32.572776 message_handler.go:27: level=INFO msg="got msg" ID=754e6c0f-3084-43a1-8758-a242f3634922 Len=11 ReceivedSubject=demoevents [watermill] 2025/05/16 13:26:29.634815 consumer.go:114: level=ERROR msg="failed to delete consumer" err="context canceled" [watermill] 2025/05/16 13:26:32.572930 router.go:571: level=INFO msg="Router closed"
Debugger finished with the exit code 0
Questions
Is this intended behavior, or should the router allow in-flight handlers to complete normally before canceling their context?