nats.java icon indicating copy to clipboard operation
nats.java copied to clipboard

When `maxBatch`, `maxBytes`, `maxExpires` are exceeded there is only a timeout but no status message

Open MauriceVanVeen opened this issue 1 year ago • 1 comments

Observed behavior

When setting maxBatch, maxBytes or maxExpires on the ConsumerConfiguration, and we exceed this value while pulling for messages, we don't get informed about this and only wait for the timeout set by expiresIn.

Expected behavior

Developer is made aware of exceeding these values, which prompts them to fix the code and pull less messages (or smaller expiresIn in the case of maxExpires). By logging the status message:

Aug 09, 2024 12:07:58 PM io.nats.client.impl.ErrorListenerLoggerImpl pullStatusWarning
WARNING: pullStatusWarning, Connection: 13, Subscription: 631365415, Consumer Name: 8V2ygPms7b, Status:Status{code=409, message='Exceeded MaxRequestBatch of 1'}

Server and client version

server 2.10.18 jnats main/2.20.0

Host environment

No response

Steps to reproduce

public class Test {
    public static void main(String[] args) throws Exception {
        try (Connection nc = Nats.connect()) {
            JetStreamManagement jsm = nc.jetStreamManagement();

            String streamName = "EVENTS";
            jsm.addStream(StreamConfiguration.builder()
                    .name(streamName)
                    .subjects("event.>")
                    .build());

            StreamContext streamContext = nc.getStreamContext(streamName);
            
            // Setting maxBatch=1, so we shouldn't allow fetching more messages at once.
            ConsumerConfiguration consumerConfig = ConsumerConfiguration.builder().maxBatch(1).build();
            ConsumerContext consumerContext = streamContext.createOrUpdateConsumer(consumerConfig);

            int count = 0;
            
            // Fetching a batch of 100 messages is not allowed, so we rightfully don't get any messages and wait for timeout.
            // But we don't get informed about the status message.
            try (FetchConsumer fetchConsumer = consumerContext.fetchMessages(100)) {
                Message msg;
                while ((msg = fetchConsumer.nextMessage()) != null) {
                    msg.ack();
                    count++;
                }
            }

            System.out.printf("Received %d messages.", count);
        }
    }
}

Prints only the following and does not inform of any exceeded maximums:

Received 0 messages.

MauriceVanVeen avatar Aug 09 '24 10:08 MauriceVanVeen

See https://github.com/nats-io/nats.js/pull/44/files

scottf avatar Aug 13 '24 21:08 scottf