nats.net.v2 icon indicating copy to clipboard operation
nats.net.v2 copied to clipboard

Update consume backpressure handling

Open mtmk opened this issue 1 year ago • 1 comments

The capacity now has a limit set to maxMsgs to prevent pulling excessive messages from the server. It also has been bounded to a maximum of 1024 to avoid large object heap allocations.

Program.cs

using NATS.Client.Core;
using NATS.Client.JetStream;
using NATS.Client.JetStream.Models;

await using var nats = new NatsConnection();
var js = new NatsJSContext(nats);

var stream = await js.CreateStreamAsync(new StreamConfig("teststream", ["test-subject"]));
var consumer = await stream.CreateOrUpdateConsumerAsync(new ConsumerConfig("testcons"));

var count = 0;
await foreach (var msg in consumer.ConsumeAsync<string>(opts: new NatsJSConsumeOpts { MaxMsgs = 5 }))
{
    await Task.Delay(3000); // processing
    await msg.AckAsync();
    if (count++ == 3)
        break; // application exits
}

Publish messages:

> nats pub test-subject '{{.Count}}' --count 50

Before: Consumes all the data exceeding maxMsgs

> nats consumer report teststream
╭─────────────────────────────────────────────────────────────────────────────────────────────────────────╮
│                             Consumer report for teststream with 1 consumers                             │
├──────────┬──────┬────────────┬──────────┬─────────────┬─────────────┬─────────────┬───────────┬─────────┤
│ Consumer │ Mode │ Ack Policy │ Ack Wait │ Ack Pending │ Redelivered │ Unprocessed │ Ack Floor │ Cluster │
├──────────┼──────┼────────────┼──────────┼─────────────┼─────────────┼─────────────┼───────────┼─────────┤
│ testcons │ Pull │ Explicit   │ 30.00s   │ 46          │ 0           │ 0           │ 4         │         │
╰──────────┴──────┴────────────┴──────────┴─────────────┴─────────────┴─────────────┴───────────┴─────────╯

After: Consumes limited to maxMsgs and a few more (1 or 2 pull requests worth) due to pre-fetch optimization.

> nats consumer report teststream
╭─────────────────────────────────────────────────────────────────────────────────────────────────────────╮
│                             Consumer report for teststream with 1 consumers                             │
├──────────┬──────┬────────────┬──────────┬─────────────┬─────────────┬─────────────┬───────────┬─────────┤
│ Consumer │ Mode │ Ack Policy │ Ack Wait │ Ack Pending │ Redelivered │ Unprocessed │ Ack Floor │ Cluster │
├──────────┼──────┼────────────┼──────────┼─────────────┼─────────────┼─────────────┼───────────┼─────────┤
│ testcons │ Pull │ Explicit   │ 30.00s   │ 10          │ 0           │ 36 / 72%    │ 4         │         │
╰──────────┴──────┴────────────┴──────────┴─────────────┴─────────────┴─────────────┴───────────┴─────────╯

mtmk avatar Jun 05 '24 23:06 mtmk

What happens if they use MaxBytes instead? Won't the channel be 1024 again and it will pull through all of the messages until it is full again?

Should we introduce an intermediate channel that consumes the full pull, then calls for the next pull after it is half way consumed?

caleblloyd avatar Jun 06 '24 14:06 caleblloyd

(revisiting this because of #608)

So the idea is to properly handle the backpressure i.e. do not issue pull requests unless application code has received the message. Before this PR that check was done after the message was buffered through the channel and we were pulling 1000 messages (size of the bounded channel) regardless of application code actually picking up the message.

Issuing a Delivered() call after yielding the message to application, we can then make the calculation and decide if we should issue another Pull Request to the server or not.

mtmk avatar Aug 22 '24 12:08 mtmk

I need to fix the tests... edit: fixed

mtmk avatar Aug 22 '24 12:08 mtmk

What happens if they use MaxBytes instead? Won't the channel be 1024 again and it will pull through all of the messages until it is full again?

Should we introduce an intermediate channel that consumes the full pull, then calls for the next pull after it is half way consumed?

we don't have to worry about this any more since calculation is done after message is yielded through the Delivered() method.

mtmk avatar Sep 09 '24 16:09 mtmk

moved to #626

mtmk avatar Sep 10 '24 16:09 mtmk