watermill icon indicating copy to clipboard operation
watermill copied to clipboard

Context Cancellation During Router Shutdown Causes In-flight Handlers cleanup (Ack/Nack/DeleteConsumer) to Fail

Open mscno opened this issue 6 months ago • 0 comments
trafficstars

Concurrency Issue: Context Cancellation During Router Shutdown Causes In-flight Handlers to Fail

Description

There appears to be a race condition in the router shutdown process that causes in-flight handlers to fail with "context canceled" errors. The problem occurs because the parent context is being canceled before handlers have completed their work.

Current Behavior

When Router.Close() is called:

  1. The closingInProgressCh channel is closed
  2. This triggers the Run() method to immediately call cancel() on the parent context
  3. Only after canceling the context does Close() attempt to wait for handlers to finish via waitForHandlers()

This premature context cancellation causes any in-flight handlers with cleanup code (Ack/Nack etc) that respect context cancellation (such as network operations) to fail with "context canceled" errors, particularly with libraries like Jetstream. This is mostly for the post handler clean up code (such as sending Ack/Nack messages over network or deleting a consumer).

Code Analysis

In message/router.go:

// In Router.Close()
close(r.closingInProgressCh)
defer close(r.closedCh)

timedout := r.waitForHandlers() // By this point, the context is already canceled
// In Router.Run()
<-r.closingInProgressCh // Waits for closing signal
cancel() // Immediately cancels context before handlers complete

// ...later
<-r.closedCh // Waits for complete shutdown

Suggested Solution

The context cancellation should happen after handlers have completed their work, not before. Options:

  1. Move the cancel() call in Run() to after waiting for the closedCh signal, which would indicate handlers have finished
  2. Implement a more coordinated shutdown where handlers are allowed to complete before context cancellation
  3. Use a separate context for shutdown operations that doesn't affect in-flight work

Reproduction

Here is a full copy pasteable reproducible example:

package main

import (
	"context"
	"fmt"
	wmjetstream "github.com/ThreeDotsLabs/watermill-nats/v2/pkg/jetstream"
	"github.com/nats-io/nats.go/jetstream"
	"time"

	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill/message"
	"github.com/google/uuid"
	"github.com/nats-io/nats.go"
)

func main() {
	ctx := context.Background()
	logger := watermill.NewStdLogger(true, true)
	// Create a new router
	r, err := message.NewRouter(message.RouterConfig{}, logger)
	if err != nil {
		panic(err)
	}

	// Inline connectToNats logic
	url := "tls://demo.nats.io"

	nc, err := nats.Connect(url)
	if err != nil {
		panic(err)
	}

	js, err := jetstream.New(nc)
	if err != nil {
		panic(err)
	}

	stream, err := js.CreateStream(ctx, jetstream.StreamConfig{Name: "demoevents"})
	if err != nil {
		panic(err)
	}
	_ = stream

	// Create publisher and subscriber
	pub, err := wmjetstream.NewPublisher(wmjetstream.PublisherConfig{Conn: nc})
	if err != nil {
		panic(err)
	}

	sub, err := wmjetstream.NewSubscriber(wmjetstream.SubscriberConfig{Conn: nc})
	if err != nil {
		panic(err)
	}

	// Set up message handling
	r.AddNoPublisherHandler("natsJsHandler", "demoevents", sub, func(msg *message.Message) error {
		fmt.Println("natsJsHandler starting", string(msg.Payload))
		time.Sleep(3 * time.Second)
		fmt.Println("natsJsHandler finished", string(msg.Payload))
		return nil
	})

	// Run router in goroutine
	go func() {
		err := r.Run(ctx)
		if err != nil {
			panic(err)
		}
	}()

	// Give the router time to initialize
	time.Sleep(1 * time.Second)

	// Publish message
	err = pub.Publish("demoevents", message.NewMessage(uuid.NewString(), []byte("hello world")))
	if err != nil {
		panic(err)
	}

	// Wait for message handling to complete

	// Close router
	err = r.Close()
	if err != nil {
		panic(err)
	}

}

Output

[watermill] 2025/05/16 13:26:12.433111 router.go:280: level=INFO msg="Adding handler" handler_name=natsJsHandler topic=demoevents [watermill] 2025/05/16 13:26:12.433507 router.go:375: level=DEBUG msg="Loading plugins" [watermill] 2025/05/16 13:26:12.433582 router.go:414: level=INFO msg="Running router handlers" count=1 [watermill] 2025/05/16 13:26:12.433597 router.go:436: level=DEBUG msg="Subscribing to topic" subscriber_name=natsJsHandler topic=demoevents [watermill] 2025/05/16 13:26:12.689864 router.go:628: level=INFO msg="Starting handler" subscriber_name=natsJsHandler topic=demoevents [watermill] 2025/05/16 13:26:12.816366 message_handler.go:27: level=INFO msg="got msg" ID=227fc7cf-9f1c-4e1a-8e6a-03ae18dddf4f Len=11 ReceivedSubject=demoevents [watermill] 2025/05/16 13:26:12.816485 router.go:802: level=TRACE msg="Received message" message_uuid=227fc7cf-9f1c-4e1a-8e6a-03ae18dddf4f natsJsHandler starting hello world [watermill] 2025/05/16 13:26:13.561223 router.go:557: level=DEBUG msg="Running Close()" [watermill] 2025/05/16 13:26:13.561304 router.go:560: level=INFO msg="Closing router" natsJsHandler finished hello world [watermill] 2025/05/16 13:26:16.214484 router.go:776: level=DEBUG msg="Waiting for subscriber to close" [watermill] 2025/05/16 13:26:17.515390 router.go:820: level=TRACE msg="Message acked" message_uuid=227fc7cf-9f1c-4e1a-8e6a-03ae18dddf4f [watermill] 2025/05/16 13:26:17.515423 router.go:393: level=INFO msg="Waiting for messages" timeout=30s [watermill] 2025/05/16 13:26:32.572636 router.go:778: level=ERROR msg="Failed to close subscriber" err="output wait group did not finish within alloted 5s" [watermill] 2025/05/16 13:26:32.572705 message_handler.go:65: level=ERROR msg="Cannot send ack" ID=227fc7cf-9f1c-4e1a-8e6a-03ae18dddf4f Len=11 ReceivedSubject=demoevents err="context canceled" [watermill] 2025/05/16 13:26:32.572717 router.go:780: level=DEBUG msg="Subscriber closed" [watermill] 2025/05/16 13:26:32.572732 router.go:654: level=DEBUG msg="Waiting for publisher to close" [watermill] 2025/05/16 13:26:32.572742 router.go:658: level=DEBUG msg="Publisher closed" [watermill] 2025/05/16 13:26:32.572748 router.go:661: level=DEBUG msg="Router handler stopped" [watermill] 2025/05/16 13:26:32.572755 message_handler.go:27: level=INFO msg="got msg" ID=1093f1ee-2b11-4d9e-a216-226c591b872e Len=11 ReceivedSubject=demoevents [watermill] 2025/05/16 13:26:32.572760 router.go:463: level=INFO msg="Subscriber stopped" subscriber_name=natsJsHandler topic=demoevents [watermill] 2025/05/16 13:26:32.572776 message_handler.go:27: level=INFO msg="got msg" ID=754e6c0f-3084-43a1-8758-a242f3634922 Len=11 ReceivedSubject=demoevents [watermill] 2025/05/16 13:26:29.634815 consumer.go:114: level=ERROR msg="failed to delete consumer" err="context canceled" [watermill] 2025/05/16 13:26:32.572930 router.go:571: level=INFO msg="Router closed"

Debugger finished with the exit code 0

Questions

Is this intended behavior, or should the router allow in-flight handlers to complete normally before canceling their context?

mscno avatar May 16 '25 11:05 mscno