sdk-go icon indicating copy to clipboard operation
sdk-go copied to clipboard

Docs: How to gracefully terminate StarReceiver?

Open skillcoder opened this issue 10 months ago • 8 comments

Can't find any documentation and actually struggle to find a way of gracefully terminate StarReceiver It looks like cancel StarReceiver also immediately cancel context of current ongoing event handlers.

skillcoder avatar Feb 17 '25 16:02 skillcoder

Can you please show an example how you use the HTTP client/protocol and the error you're seeing? I quickly went through the code and IMHO the protocol tries to do its best to gracefully handle cancellation.

embano1 avatar Feb 28 '25 02:02 embano1

Currently, by examining the code, the information I have obtained is that simply returning io.EOF when fetching data in the protocol will allow for a graceful exit.

Like this: https://github.com/cloudevents/sdk-go/blob/6de37de32b83aee0bc53fe31b6358d4e5f1e9ede/protocol/amqp/v2/receiver.go#L32

This is for reference only and not to be taken as a definitive answer.

flc1125 avatar Apr 11 '25 10:04 flc1125

Can you please show an example how you use the HTTP client/protocol and the error you're seeing? I quickly went through the code and IMHO the protocol tries to do its best to gracefully handle cancellation.

This is just not true, at all. I can't find any method which complement client.StartReceiver and do proper graceful shutdown. By GRACEFUL SHUTDOWN I mean:

  • if we start processing some event by fn from StartReceiver(ctx context.Context, fn interface{}) error, we MUST wait for result if this handler.
  • This GRACEFUL Shutdown function should have shutdown ctx parameter which may timeouted and in this case we MUST immideatly return error for Shutdown function.

Just take a look at the standard http package for proper example how it should be implemented.

Right now even if I call protocol.Close(ctx) I don't see expected behaviour, like draining (prevent new processing) and waiting for on flight handler.

skillcoder avatar May 17 '25 10:05 skillcoder

Just try this by yourself, here is example of the handler which could demonstrate the problem

type processorMock struct {
	testName        string
	eventDelay      time.Duration
	processedEvents int32
	mu              sync.Mutex
	processedIDs    map[string]struct{}
}

func newProcessorMock(testName string, eventDelay time.Duration) *processorMock {
	return &processorMock{
		testName:        testName,
		eventDelay:      eventDelay,
		processedEvents: 0,
		processedIDs:    make(map[string]struct{}, 8),
		mu:              sync.Mutex{},
	}
}

func (p *processorMock) Process() func(ctx context.Context, event cloudevents.Event) error {
	return func(ctx context.Context, event cloudevents.Event) error {
		log.Info().
			Str("test", p.testName).
			Str("subject", event.Subject()).
			Str("id", event.ID()).
			Msg("Processing event started")

		eventDuration := getEventDuration(event, p.eventDelay)

		select {
		case <-ctx.Done(): // Context from WithProcessEventTimeout (individual event)
			log.Warn().
				Str("test", p.testName).
				Str("subject", event.Subject()).
				Str("id", event.ID()).
				Err(ctx.Err()).
				Msg("Individual event processing timed out by its own context")

			return ctx.Err()
		case <-time.After(eventDuration):
			p.mu.Lock()
			if _, seen := p.processedIDs[event.ID()]; !seen {
				atomic.AddInt32(&p.processedEvents, 1)
				p.processedIDs[event.ID()] = struct{}{}
				log.Info().
					Str("test", p.testName).
					Str("subject", event.Subject()).
					Str("id", event.ID()).
					Int32("count", atomic.LoadInt32(&p.processedEvents)).
					Msg("Processing event counted")
			}
			p.mu.Unlock()

			return nil
		}
	}
}

Just start receiver with eventDuration=1s by following code

	if err := client.StartReceiver(ctx, processor.Process()); err != nil {
		return fmt.Errorf("start receiver: %w", err)
	}

And in 1500ms call protocol.Close(ctx), the expected behaviour to see 2 Processing event counted, but you will se only one and 2 Processing event started

skillcoder avatar May 17 '25 10:05 skillcoder

I use NATS protocol. But it does not matter.

skillcoder avatar May 17 '25 10:05 skillcoder

Graceful shutdown is the process where a system or application safely stops by allowing all active operations (handlers) to complete their tasks and return results without being abruptly interrupted. This process respects a predefined timeout period, after which any remaining tasks may be terminated.

skillcoder avatar May 17 '25 10:05 skillcoder

Currently the only workaround is to track handler status on application side and wait for all in-flight handler finished.

skillcoder avatar May 17 '25 11:05 skillcoder

@skillcoder Have you tried graceful shutdown via the ctx passed to StartReceiver (e.g., with signal.NotifyContext() to handle SIGTERM/INTERRUPT) instead of closing the protocol before? I quickly tested this with your code and the HTTP protocol and it seems to handle all in-flight requests gracefully?

embano1 avatar May 25 '25 08:05 embano1