nats.go
nats.go copied to clipboard
Proper timeout handling in `func (*pullConsumer) fetch()`
Proposed change
IMHO, there're something we can improve in these lines of code.
https://github.com/nats-io/nats.go/blob/8712190da1d17ab0c4719bffa7c0174214c56e6c/jetstream/pull.go#L770-L772
- In batch request, it'll create a lot of
time.Timer
according to the doc. Before the timeout reached, the timer is not GC-able. For client with Higher throughput and with higherExpires
, there will be more concurrent timer running, which is unnecessary.- For this problem, we can create a dedicated timer for the loop (instead of each message), and uses
time.Timer.Stop()
andtime.Timer.Reset()
to reuse thetime.Timer
. In this way, the loop will be more efficient.
- For this problem, we can create a dedicated timer for the loop (instead of each message), and uses
- Currently, the timeout is controlled by
Expires
and a constant1 * time.Second
, which is the same field as the request pass to server. For client or server under stress, the timeout may easily reached. Especially forFetchNoWait()
.- I think we can separate the
Expires
into 2 options, one used as request that passes to the server, another serves as client receiving timeout. - Or we can uses
context.Context.Done()
for receiving timeout.
- I think we can separate the
- Messages delivered after timeout reached AND before the server received
UNSUB
should be unacked explicitly (instread of waiting server ack timeout). Current implementation is just dropping the messages. For those usingAckPolicy: AckAll
, this behavior is dangerous, we could lose message permanently if we received later message and ack them. It's possible because we normally write a loop to batch request (Fetch()
).
Use case
outer:
for {
batch, err := stream.Fetch(batchSize)
// ...
inner:
var lastMessage jetstream.Msg
for _, message := batch.Messages() {
lastMessage = message
// If the client or server is under stress, this inner loop may never run.
}
// ...
if lastMessage == nil {
// empty batch
continue
}
err := lastMessage.Ack(ctx) // Ack entire batch of message with `AckPolicy: AckAll`
// ...
}
Contribution
Yes
For 1), in nats package, there is a timerPool that can reuse time.Timer, I think it is possible to make a same design , or reuse timerPool in nats package? https://github.com/nats-io/nats.go/blob/8712190da1d17ab0c4719bffa7c0174214c56e6c/timer.go#L26
var globalTimerPool timerPool
// timerPool provides GC-able pooling of *time.Timer's.
// can be used by multiple goroutines concurrently.
type timerPool struct {
p sync.Pool
}
func (tp *timerPool) Get(d time.Duration) *time.Timer {
if t, _ := tp.p.Get().(*time.Timer); t != nil {
t.Reset(d)
return t
}
return time.NewTimer(d)
}
func (tp *timerPool) Put(t *time.Timer) {
if !t.Stop() {
select {
case <-t.C:
default:
}
}
tp.p.Put(t)
}