Update consume backpressure handling
The capacity now has a limit set to maxMsgs to prevent pulling excessive messages from the server. It also has been bounded to a maximum of 1024 to avoid large object heap allocations.
Program.cs
using NATS.Client.Core;
using NATS.Client.JetStream;
using NATS.Client.JetStream.Models;
await using var nats = new NatsConnection();
var js = new NatsJSContext(nats);
var stream = await js.CreateStreamAsync(new StreamConfig("teststream", ["test-subject"]));
var consumer = await stream.CreateOrUpdateConsumerAsync(new ConsumerConfig("testcons"));
var count = 0;
await foreach (var msg in consumer.ConsumeAsync<string>(opts: new NatsJSConsumeOpts { MaxMsgs = 5 }))
{
await Task.Delay(3000); // processing
await msg.AckAsync();
if (count++ == 3)
break; // application exits
}
Publish messages:
> nats pub test-subject '{{.Count}}' --count 50
Before: Consumes all the data exceeding maxMsgs
> nats consumer report teststream
╭─────────────────────────────────────────────────────────────────────────────────────────────────────────╮
│ Consumer report for teststream with 1 consumers │
├──────────┬──────┬────────────┬──────────┬─────────────┬─────────────┬─────────────┬───────────┬─────────┤
│ Consumer │ Mode │ Ack Policy │ Ack Wait │ Ack Pending │ Redelivered │ Unprocessed │ Ack Floor │ Cluster │
├──────────┼──────┼────────────┼──────────┼─────────────┼─────────────┼─────────────┼───────────┼─────────┤
│ testcons │ Pull │ Explicit │ 30.00s │ 46 │ 0 │ 0 │ 4 │ │
╰──────────┴──────┴────────────┴──────────┴─────────────┴─────────────┴─────────────┴───────────┴─────────╯
After: Consumes limited to maxMsgs and a few more (1 or 2 pull requests worth) due to pre-fetch optimization.
> nats consumer report teststream
╭─────────────────────────────────────────────────────────────────────────────────────────────────────────╮
│ Consumer report for teststream with 1 consumers │
├──────────┬──────┬────────────┬──────────┬─────────────┬─────────────┬─────────────┬───────────┬─────────┤
│ Consumer │ Mode │ Ack Policy │ Ack Wait │ Ack Pending │ Redelivered │ Unprocessed │ Ack Floor │ Cluster │
├──────────┼──────┼────────────┼──────────┼─────────────┼─────────────┼─────────────┼───────────┼─────────┤
│ testcons │ Pull │ Explicit │ 30.00s │ 10 │ 0 │ 36 / 72% │ 4 │ │
╰──────────┴──────┴────────────┴──────────┴─────────────┴─────────────┴─────────────┴───────────┴─────────╯
What happens if they use MaxBytes instead? Won't the channel be 1024 again and it will pull through all of the messages until it is full again?
Should we introduce an intermediate channel that consumes the full pull, then calls for the next pull after it is half way consumed?
(revisiting this because of #608)
So the idea is to properly handle the backpressure i.e. do not issue pull requests unless application code has received the message. Before this PR that check was done after the message was buffered through the channel and we were pulling 1000 messages (size of the bounded channel) regardless of application code actually picking up the message.
Issuing a Delivered() call after yielding the message to application, we can then make the calculation and decide if we should issue another Pull Request to the server or not.
I need to fix the tests... edit: fixed
What happens if they use MaxBytes instead? Won't the channel be 1024 again and it will pull through all of the messages until it is full again?
Should we introduce an intermediate channel that consumes the full pull, then calls for the next pull after it is half way consumed?
we don't have to worry about this any more since calculation is done after message is yielded through the Delivered() method.
moved to #626