Docs: How to gracefully terminate StarReceiver?
Can't find any documentation and actually struggle to find a way of gracefully terminate StarReceiver It looks like cancel StarReceiver also immediately cancel context of current ongoing event handlers.
Can you please show an example how you use the HTTP client/protocol and the error you're seeing? I quickly went through the code and IMHO the protocol tries to do its best to gracefully handle cancellation.
Currently, by examining the code, the information I have obtained is that simply returning io.EOF when fetching data in the protocol will allow for a graceful exit.
Like this: https://github.com/cloudevents/sdk-go/blob/6de37de32b83aee0bc53fe31b6358d4e5f1e9ede/protocol/amqp/v2/receiver.go#L32
This is for reference only and not to be taken as a definitive answer.
Can you please show an example how you use the HTTP client/protocol and the error you're seeing? I quickly went through the code and IMHO the protocol tries to do its best to gracefully handle cancellation.
This is just not true, at all.
I can't find any method which complement client.StartReceiver and do proper graceful shutdown.
By GRACEFUL SHUTDOWN I mean:
- if we start processing some event by
fnfromStartReceiver(ctx context.Context, fn interface{}) error, we MUST wait for result if this handler. - This GRACEFUL
Shutdownfunction should have shutdownctxparameter which may timeouted and in this case we MUST immideatly return error forShutdownfunction.
Just take a look at the standard http package for proper example how it should be implemented.
Right now even if I call protocol.Close(ctx) I don't see expected behaviour, like draining (prevent new processing) and waiting for on flight handler.
Just try this by yourself, here is example of the handler which could demonstrate the problem
type processorMock struct {
testName string
eventDelay time.Duration
processedEvents int32
mu sync.Mutex
processedIDs map[string]struct{}
}
func newProcessorMock(testName string, eventDelay time.Duration) *processorMock {
return &processorMock{
testName: testName,
eventDelay: eventDelay,
processedEvents: 0,
processedIDs: make(map[string]struct{}, 8),
mu: sync.Mutex{},
}
}
func (p *processorMock) Process() func(ctx context.Context, event cloudevents.Event) error {
return func(ctx context.Context, event cloudevents.Event) error {
log.Info().
Str("test", p.testName).
Str("subject", event.Subject()).
Str("id", event.ID()).
Msg("Processing event started")
eventDuration := getEventDuration(event, p.eventDelay)
select {
case <-ctx.Done(): // Context from WithProcessEventTimeout (individual event)
log.Warn().
Str("test", p.testName).
Str("subject", event.Subject()).
Str("id", event.ID()).
Err(ctx.Err()).
Msg("Individual event processing timed out by its own context")
return ctx.Err()
case <-time.After(eventDuration):
p.mu.Lock()
if _, seen := p.processedIDs[event.ID()]; !seen {
atomic.AddInt32(&p.processedEvents, 1)
p.processedIDs[event.ID()] = struct{}{}
log.Info().
Str("test", p.testName).
Str("subject", event.Subject()).
Str("id", event.ID()).
Int32("count", atomic.LoadInt32(&p.processedEvents)).
Msg("Processing event counted")
}
p.mu.Unlock()
return nil
}
}
}
Just start receiver with eventDuration=1s by following code
if err := client.StartReceiver(ctx, processor.Process()); err != nil {
return fmt.Errorf("start receiver: %w", err)
}
And in 1500ms call protocol.Close(ctx), the expected behaviour to see 2 Processing event counted, but you will se only one and 2 Processing event started
I use NATS protocol. But it does not matter.
Graceful shutdown is the process where a system or application safely stops by allowing all active operations (handlers) to complete their tasks and return results without being abruptly interrupted. This process respects a predefined timeout period, after which any remaining tasks may be terminated.
Currently the only workaround is to track handler status on application side and wait for all in-flight handler finished.
@skillcoder Have you tried graceful shutdown via the ctx passed to StartReceiver (e.g., with signal.NotifyContext() to handle SIGTERM/INTERRUPT) instead of closing the protocol before? I quickly tested this with your code and the HTTP protocol and it seems to handle all in-flight requests gracefully?