nats-server icon indicating copy to clipboard operation
nats-server copied to clipboard

Reading messages as consumer sometimes slows down suddenly.

Open yuzhou-nj opened this issue 1 year ago • 10 comments

Observed behavior

Hi, I have developed a "consumer program", which reads messages from stream in pull mode. A maximum of 200 messages can be read at a time, and then sent to nginx through http+post messages. It is found that the number of messages read sometimes decreases and can be recovered automatically after a period of time. This problem starts to occur at nats-server v2.10.16.

There is no abnormal log in nat-server.log. When the number of read messages decreases, I can see through the nats stream state XXX that there are messages being deleted.

Thank you for your help.

Expected behavior

The consumer should read messages stably and continuously.

Server and client version

./nats-server -v

nats-server: v2.10.16

Host environment

CentOS 8

Steps to reproduce

stream info:

# nats s info user_cache
Information for Stream user_cache created 2023-06-21 17:05:11

              Subjects: user_cache.>
              Replicas: 3
               Storage: File

Options:

             Retention: Interest
       Acknowledgments: true
        Discard Policy: Old
      Duplicate Window: 2m0s
     Allows Msg Delete: true
          Allows Purge: true
        Allows Rollups: false

Limits:

      Maximum Messages: 20,000,000
   Maximum Per Subject: unlimited
         Maximum Bytes: unlimited
           Maximum Age: 7d0h0m0s
  Maximum Message Size: unlimited
     Maximum Consumers: unlimited

Cluster Information:

                  Name: nats_cluster
                Leader: nats0
               Replica: nats1, current, seen 891ms ago
               Replica: nats2, current, seen 891ms ago

State:

              Messages: 5,000,000
                 Bytes: 5.0 GiB
        First Sequence: 60,277,152 @ 2024-07-16 19:06:53 UTC
         Last Sequence: 65,277,151 @ 2024-07-16 19:08:18 UTC
      Active Consumers: 1
    Number of Subjects: 1

consumer info:

# nats c report user_cache
╭─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╮
│                                     Consumer report for user_cache with 1 consumers                                         │
├───────────┬──────┬────────────┬──────────┬─────────────┬─────────────┬──────────────────┬────────────┬──────────────────────┤
│ Consumer  │ Mode │ Ack Policy │ Ack Wait │ Ack Pending │ Redelivered │ Unprocessed      │ Ack Floor  │ Cluster              │
├───────────┼──────┼────────────┼──────────┼─────────────┼─────────────┼──────────────────┼────────────┼──────────────────────┤
│ natssync2 │ Pull │ Explicit   │ 30.00s   │ 0           │ 0           │ 5,000,000 / 100% │ 60,277,151 │ nats0, nats1*, nats2 │
╰───────────┴──────┴────────────┴──────────┴─────────────┴─────────────┴──────────────────┴────────────┴──────────────────────╯

In nats-server v2.10.14 and earlier versions, the number of messages consumed per second is stable.

2024/07/16 19:05:32.201262 natssync2 user_cache: LogStat totalsucc=4706832 totalfail=0 succ=12588 fail=0 opertkey_num=0
2024/07/16 19:05:33.201295 natssync2 user_cache: LogStat totalsucc=4719800 totalfail=0 succ=12968 fail=0 opertkey_num=0
2024/07/16 19:05:34.201825 natssync2 user_cache: LogStat totalsucc=4732200 totalfail=0 succ=12400 fail=0 opertkey_num=0
2024/07/16 19:05:35.202032 natssync2 user_cache: LogStat totalsucc=4744800 totalfail=0 succ=12600 fail=0 opertkey_num=0
2024/07/16 19:05:36.202148 natssync2 user_cache: LogStat totalsucc=4757400 totalfail=0 succ=12600 fail=0 opertkey_num=0
2024/07/16 19:05:37.202182 natssync2 user_cache: LogStat totalsucc=4770290 totalfail=0 succ=12890 fail=0 opertkey_num=0
2024/07/16 19:05:38.202221 natssync2 user_cache: LogStat totalsucc=4782800 totalfail=0 succ=12508 fail=0 opertkey_num=0
2024/07/16 19:05:39.205999 natssync2 user_cache: LogStat totalsucc=4797600 totalfail=0 succ=14800 fail=0 opertkey_num=0
2024/07/16 19:05:40.206041 natssync2 user_cache: LogStat totalsucc=4812277 totalfail=0 succ=14677 fail=0 opertkey_num=0
2024/07/16 19:05:41.207710 natssync2 user_cache: LogStat totalsucc=4827839 totalfail=0 succ=15559 fail=0 opertkey_num=0
2024/07/16 19:05:42.208057 natssync2 user_cache: LogStat totalsucc=4840280 totalfail=0 succ=12439 fail=0 opertkey_num=0
2024/07/16 19:05:43.208128 natssync2 user_cache: LogStat totalsucc=4853222 totalfail=0 succ=12942 fail=0 opertkey_num=0
2024/07/16 19:05:44.208224 natssync2 user_cache: LogStat totalsucc=4865400 totalfail=0 succ=12177 fail=0 opertkey_num=0
2024/07/16 19:05:45.208259 natssync2 user_cache: LogStat totalsucc=4877846 totalfail=0 succ=12446 fail=0 opertkey_num=0
2024/07/16 19:05:46.209216 natssync2 user_cache: LogStat totalsucc=4889798 totalfail=0 succ=11950 fail=0 opertkey_num=0

On nats-server v2.10.16 and 2.10.17, the number of consumed messages fluctuates. (look 19:18:50~55, succ=)

2024/07/16 19:18:43.280567 natssync2  user_cache: LogStat totalsucc=1175348 totalfail=0 succ=11148 fail=0 opertkey_num=0
2024/07/16 19:18:44.280861 natssync2  user_cache: LogStat totalsucc=1187185 totalfail=0 succ=11837 fail=0 opertkey_num=0
2024/07/16 19:18:45.280901 natssync2  user_cache: LogStat totalsucc=1199124 totalfail=0 succ=11940 fail=0 opertkey_num=0
2024/07/16 19:18:46.281127 natssync2  user_cache: LogStat totalsucc=1211669 totalfail=0 succ=12544 fail=0 opertkey_num=0
2024/07/16 19:18:47.281431 natssync2  user_cache: LogStat totalsucc=1223600 totalfail=0 succ=11929 fail=0 opertkey_num=0
2024/07/16 19:18:48.283433 natssync2  user_cache: LogStat totalsucc=1234801 totalfail=0 succ=11201 fail=0 opertkey_num=0
2024/07/16 19:18:49.283868 natssync2  user_cache: LogStat totalsucc=1239000 totalfail=0 succ=4199 fail=0 opertkey_num=0
2024/07/16 19:18:50.284069 natssync2  user_cache: LogStat totalsucc=1239200 totalfail=0 succ=200 fail=0 opertkey_num=0
2024/07/16 19:18:51.286809 natssync2  user_cache: LogStat totalsucc=1239600 totalfail=0 succ=400 fail=0 opertkey_num=0
2024/07/16 19:18:52.287828 natssync2  user_cache: LogStat totalsucc=1239800 totalfail=0 succ=200 fail=0 opertkey_num=0
2024/07/16 19:18:53.288306 natssync2  user_cache: LogStat totalsucc=1240600 totalfail=0 succ=800 fail=0 opertkey_num=0
2024/07/16 19:18:54.288766 natssync2  user_cache: LogStat totalsucc=1241200 totalfail=0 succ=600 fail=0 opertkey_num=0
2024/07/16 19:18:55.289730 natssync2  user_cache: LogStat totalsucc=1245800 totalfail=0 succ=4600 fail=0 opertkey_num=0
2024/07/16 19:18:56.289902 natssync2  user_cache: LogStat totalsucc=1257400 totalfail=0 succ=11600 fail=0 opertkey_num=0
2024/07/16 19:18:57.289951 natssync2  user_cache: LogStat totalsucc=1269230 totalfail=0 succ=11830 fail=0 opertkey_num=0
2024/07/16 19:18:58.289999 natssync2  user_cache: LogStat totalsucc=1281459 totalfail=0 succ=12227 fail=0 opertkey_num=0

Thank you very much!

yuzhou-nj avatar Jul 16 '24 12:07 yuzhou-nj

Please post the result of nats consumer info user_cache natssync2.

neilalexander avatar Jul 16 '24 12:07 neilalexander

Especially when messages are published and consumed at the same time, consumers usually cannot read messages or rarely read messages.

I published messages like this:

nats bench "user_cache.default" --pub 3 --size="1024" --msgs=3000000 

The log is:

2024/07/16 20:19:42.761793 user_cache consumer=natssync2 : JsReadMessage get 200 msgs, readwait=21.021926ms
2024/07/16 20:19:42.789626 user_cache consumer=natssync2 : JsReadMessage get 200 msgs, readwait=20.166502ms
2024/07/16 20:19:42.814555 user_cache consumer=natssync2 : JsReadMessage get 200 msgs, readwait=20.083135ms
2024/07/16 20:19:42.839620 user_cache consumer=natssync2 : JsReadMessage get 200 msgs, readwait=20.444267ms
2024/07/16 20:19:42.865367 user_cache consumer=natssync2 : JsReadMessage get 200 msgs, readwait=21.316913ms
2024/07/16 20:19:42.890803 user_cache consumer=natssync2 : JsReadMessage get 200 msgs, readwait=21.080772ms
2024/07/16 20:19:43.117156 user_cache: LogStat totalsucc=3224355 totalfail=7380 succ=6185 fail=0 opertkey_num=0
2024/07/16 20:19:44.118286 user_cache: LogStat totalsucc=3224355 totalfail=7386 succ=0 fail=6 opertkey_num=0  <-- No message is read. In fact, there are many messages.
2024/07/16 20:19:45.118401 user_cache: LogStat totalsucc=3224355 totalfail=7401 succ=0 fail=15 opertkey_num=0
2024/07/16 20:19:46.118867 user_cache: LogStat totalsucc=3224355 totalfail=7401 succ=0 fail=0 opertkey_num=0
2024/07/16 20:19:47.119556 user_cache: LogStat totalsucc=3224355 totalfail=7401 succ=0 fail=0 opertkey_num=0
2024/07/16 20:19:48.119782 user_cache: LogStat totalsucc=3224355 totalfail=7401 succ=0 fail=0 opertkey_num=0
2024/07/16 20:19:49.120147 user_cache: LogStat totalsucc=3224355 totalfail=7401 succ=0 fail=0 opertkey_num=0
2024/07/16 20:19:50.120357 user_cache: LogStat totalsucc=3224355 totalfail=7401 succ=0 fail=0 opertkey_num=0
2024/07/16 20:19:51.120815 user_cache: LogStat totalsucc=3224355 totalfail=7401 succ=0 fail=0 opertkey_num=0
2024/07/16 20:19:52.121587 user_cache: LogStat totalsucc=3224355 totalfail=7401 succ=0 fail=0 opertkey_num=0
2024/07/16 20:19:53.121727 user_cache: LogStat totalsucc=3224355 totalfail=7401 succ=0 fail=0 opertkey_num=0
2024/07/16 20:19:54.122866 user_cache: LogStat totalsucc=3224355 totalfail=7401 succ=0 fail=0 opertkey_num=0
2024/07/16 20:19:54.786908 user_cache consumer=natssync2 : JsReadMessage get 200 msgs, readwait=1.891032717s  <-- wait too long
2024/07/16 20:19:54.805761 user_cache consumer=natssync2 : JsReadMessage get 200 msgs, readwait=11.227657ms
2024/07/16 20:19:55.123079 user_cache: LogStat totalsucc=3224753 totalfail=7401 succ=398 fail=0 opertkey_num=0
2024/07/16 20:19:56.124053 user_cache: LogStat totalsucc=3224753 totalfail=7401 succ=0 fail=0 opertkey_num=0
2024/07/16 20:19:57.125054 user_cache: LogStat totalsucc=3224753 totalfail=7403 succ=0 fail=2 opertkey_num=0
2024/07/16 20:19:58.126125 user_cache: LogStat totalsucc=3224753 totalfail=7403 succ=0 fail=0 opertkey_num=0
2024/07/16 20:19:59.126320 user_cache: LogStat totalsucc=3224753 totalfail=7403 succ=0 fail=0 opertkey_num=0
2024/07/16 20:20:00.126927 user_cache: LogStat totalsucc=3224753 totalfail=7403 succ=0 fail=0 opertkey_num=0
2024/07/16 20:20:01.127135 user_cache: LogStat totalsucc=3224753 totalfail=7403 succ=0 fail=0 opertkey_num=0
2024/07/16 20:20:02.127650 user_cache: LogStat totalsucc=3224753 totalfail=7403 succ=0 fail=0 opertkey_num=0
2024/07/16 20:20:03.127850 user_cache: LogStat totalsucc=3224753 totalfail=7403 succ=0 fail=0 opertkey_num=0
2024/07/16 20:20:04.128093 user_cache: LogStat totalsucc=3224753 totalfail=7403 succ=0 fail=0 opertkey_num=0
2024/07/16 20:20:05.129193 user_cache: LogStat totalsucc=3224753 totalfail=7403 succ=0 fail=0 opertkey_num=0
2024/07/16 20:20:06.130280 user_cache: LogStat totalsucc=3224753 totalfail=7403 succ=0 fail=0 opertkey_num=0
2024/07/16 20:20:07.130882 user_cache: LogStat totalsucc=3224753 totalfail=7403 succ=0 fail=0 opertkey_num=0
2024/07/16 20:20:08.131403 user_cache: LogStat totalsucc=3224753 totalfail=7403 succ=0 fail=0 opertkey_num=0
2024/07/16 20:20:09.131953 user_cache: LogStat totalsucc=3224753 totalfail=7403 succ=0 fail=0 opertkey_num=0
2024/07/16 20:20:10.132468 user_cache: LogStat totalsucc=3224753 totalfail=7403 succ=0 fail=0 opertkey_num=0
2024/07/16 20:20:11.132621 user_cache: LogStat totalsucc=3224753 totalfail=7403 succ=0 fail=0 opertkey_num=0
2024/07/16 20:20:11.487455 user_cache consumer=natssync2 : JsReadMessage get 200 msgs, readwait=6.674830837s  <-- wait too long
2024/07/16 20:20:11.506314 user_cache consumer=natssync2 : JsReadMessage get 200 msgs, readwait=9.784367ms
2024/07/16 20:20:11.526660 user_cache consumer=natssync2 : JsReadMessage get 200 msgs, readwait=13.911119ms
2024/07/16 20:20:11.545752 user_cache consumer=natssync2 : JsReadMessage get 200 msgs, readwait=9.892692ms

yuzhou-nj avatar Jul 16 '24 12:07 yuzhou-nj

nats consumer info user_cache natssync2

# nats c info user_cache natssync2      
Information for Consumer user_cache > natssync2 created 2024-07-16T16:26:43+08:00

Configuration:

                    Name: natssync2
               Pull Mode: true
          Deliver Policy: All
              Ack Policy: Explicit
                Ack Wait: 30.00s
           Replay Policy: Instant
         Max Ack Pending: 1,000
       Max Waiting Pulls: 512

Cluster Information:

                    Name: nats_cluster
                  Leader: nats2
                 Replica: nats0, current, seen 78ms ago
                 Replica: nats1, current, seen 78ms ago

State:

  Last Delivered Message: Consumer sequence: 33,008,777 Stream sequence: 65,277,151
    Acknowledgment Floor: Consumer sequence: 33,008,777 Stream sequence: 65,277,151
        Outstanding Acks: 0 out of maximum 1,000
    Redelivered Messages: 0
    Unprocessed Messages: 0
           Waiting Pulls: 0 of maximum 512

Thank you!

yuzhou-nj avatar Jul 17 '24 01:07 yuzhou-nj

Is the publishing rate more stable on 2.10.18?

neilalexander avatar Jul 19 '24 14:07 neilalexander

@yuzhou-nj I am not dismissing that you report a difference in consumer rate between several versions, but saying that the consumer rate is affected, especially when publishing is no surprise. You are adding extra load to the server, especially that you are using the bench tool that does not use JetStream and sends NATS messages as fast as it can.

kozlovic avatar Jul 19 '24 16:07 kozlovic

When there are a large number of messages in the stream, just reading and acknowledging the messages, even if no new messages are being inserted, this issue still exists. It seems that because the stream was deleting a large number of messages at that time, the client was unable to read the messages or the number of messages read decreased

yuzhou-nj avatar Jul 20 '24 09:07 yuzhou-nj

Is the publishing rate more stable on 2.10.18?

The speed of reading messages is unstable, not publish. I also tried nats-server v2.10.18, and the issue still exists; however, versions v2.10.5 to v2.10.14 do not have this problem.

yuzhou-nj avatar Jul 20 '24 09:07 yuzhou-nj

Note that you should not publish to JetStream with nats bench unless using the --js flag as well (otherwise you are doing Core NATS publications, which could potentially overwhelm the nats-server in charge of persisting the messages for that stream).

jnmoyne avatar Jul 28 '24 23:07 jnmoyne

@yuzhou-nj I wonder if this issue is similar to https://github.com/nats-io/nats-server/issues/5702. I have investigated and reported my findings to rest of the eng team.

kozlovic avatar Jul 28 '24 23:07 kozlovic

@yuzhou-nj You may want to re-run your tests with a build of the nats-server that contains https://github.com/nats-io/nats-server/pull/5719, which should be available in the nightly tonight (PST).

kozlovic avatar Jul 29 '24 23:07 kozlovic

Have the same problem here. I am consuming over java nats client v2.20.5 and nats server 2.10.22.

Have a single server setup. No clustering.

This is how I am creating my sub

    public <T> void pullSubscription(String subjectKey,
                                     INatsStreamDefinition definition,
                                     ExecutorService executorService,
                                     IBatchMessageHandler<T> handler) {
        // The batch size can be 1 to 256 inclusive. Values outside that range will throw an exception.
        int batchsize = 200;
        logger.info("Starting pullSubscription for subjectKey %s  with %s partitions and batch size %s".formatted(subjectKey, definition.getPartitions().get(subjectKey), batchsize));
        if (definition.getPartitions().get(subjectKey) > 0) {
            throw new IllegalArgumentException("Partition count must be 0 for " + subjectKey + " in " + definition.getStreamName());
        }

        // TODO: vuru maxAckPending is a fixed value in JS config after the consumer is created
        // Find a way to update the consumer if this value changes
        int maxAckPending = 200;
        try {
            JetStream js = natsConnection.jetStream();
            String streamName = definition.getStreamName() + "_" + subjectKey;
            PullSubscribeOptions subPull = PullSubscribeOptions.builder()
                    .durable("Consumer_" + streamName)
                    .configuration(ConsumerConfiguration.builder()
                            .name("Consumer_" + streamName)
                            .maxAckPending(maxAckPending)
                            .ackWait(Duration.ofSeconds(processTimeSeconds))
                            .ackPolicy(AckPolicy.All)
                            .build()
                    )
                    .build();
            final JetStreamSubscription sub = js.subscribe(definition.getSubjects().get(subjectKey) + ".>",
                    subPull);
            subscriptions.add(sub);
            natsConnection.flush(Duration.ofMillis(100));
            logger.info("""
                        Subject: [{}]
                        StreamName [{}]
                        Stream Info
                        {}
                        """, definition.getSubjects().get(subjectKey) + ".>", streamName, JsonUtils.getFormatted(sub));

            executorService.submit(() -> {
                while (true) {
                    try {
                        List<Message> fetch = sub.fetch(batchsize, Duration.ofMillis(100));
                        if (!fetch.isEmpty()) {
                            List<Message> invalidMessages = new ArrayList<>();
                            List<T> messages = fetch.stream()
                                    .map(message -> {
                                        try {
                                            // Avoid to redeliver during batch process
                                            message.inProgress();
                                            return handler.getMessageConverter().convert(message);
                                        } catch (Exception e) {
                                            logger.error("Can't covert message ", e);
                                            invalidMessages.add(message);
                                            return null;
                                        }
                                    }).filter(Objects::nonNull).toList();
                            if (!invalidMessages.isEmpty()) {
                                logger.warn("Invalid messages: {}", invalidMessages.size());
                                invalidMessages.forEach(message -> {
                                    try {
                                        logger.warn("Invalid message: {}", new String(message.getData()));
                                    } catch (Exception e) {
                                        logger.error("Error reading message", e);
                                        message.ack();
                                        fetch.remove(message);
                                    }
                                });
                            }
                            if (!messages.isEmpty()) {
                                try {
                                    handler.onMessages(messages);
                                    fetch.getLast().ackSync(Duration.ofSeconds(ackTimeOutSeconds));
//                                    fetch.getLast().ack();
                                } catch (TimeoutException e) {
                                    logger.error("Failed to acknowledge messages for subject key {} after {} seconds . Size {}", subjectKey, ackTimeOutSeconds, messages.size(), e);
                                    // We nack only the last for nack all the batch
                                    fetch.getLast().nak();
                                } catch (Exception e) {
                                    logger.error("Error on handling messages- ", e);
                                    // We nack only the last for nack all the batch
                                    fetch.getLast().nak();
                                }
                            }

                        }
                    } catch (Exception e) {
                        logger.error("Can't fetch messages", e);
                        exit();
                    }
                }
            });


        } catch (JetStreamApiException | IOException | TimeoutException |InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

I have 4 streams ( seperate JVM processes ).

But somehow only one stream slows down after few days. The fetch triggers only ( 7s - 10s )

{F48DAFB5-9873-46A4-B3AE-00750689FC7A}

If I delete the stream then it works few days again fine . After that it slows down.

suikast42 avatar Dec 05 '24 09:12 suikast42

@suikast42, couple clarifying questions about the code snippet:

  • In .ackWait(Duration.ofSeconds(processTimeSeconds)) what is the AckWait / processTimeSeconds set to?

Few things that could likely take a longer amount of time as well:

  • handler.getMessageConverter().convert(message), how long does this conversion take, can it take a long time, could you add some logging to check how long this took/exceeds more than expected?
  • same thing ^ for handler.onMessages(messages)
  • fetch.getLast().ackSync(Duration.ofSeconds(ackTimeOutSeconds));, Using ack sync could timeout, what's ackTimeOutSeconds set to?

Another thing you could try is creating a consumer that simply does fetch and ack, without any processing. If it doesn't happen there it could point to your processing slowing down, if not then it's likely library or server.

That should help in getting it reproduced/narrowed down.

MauriceVanVeen avatar Dec 05 '24 22:12 MauriceVanVeen

  • In .ackWait(Duration.ofSeconds(processTimeSeconds)) what is the AckWait / processTimeSeconds set to?
    private final int ackTimeOutSeconds = 45;
    private final int processTimeSeconds = 60;

Few things that could likely take a longer amount of time as well:

  • handler.getMessageConverter().convert(message), how long does this conversion take, can it take a long time, could you add some logging to check how long this took/exceeds more than expected?
  • same thing ^ for handler.onMessages(messages)
  • fetch.getLast().ackSync(Duration.ofSeconds(ackTimeOutSeconds));, Using ack sync could timeout, what's ackTimeOutSeconds set to?

The last time where the slowdown comes I comment out all the process logic.

But that changes nothing. There was no process logic only receive the batch and ack the last one. There was no error in ack timeout but the fetch comes every 7 - 10s .

My Stream setup is like that:

inggress.device.<partitionKey> -> process messages push to egress -> egress.device -> service for peristence ( here hangs the pull sub ) .

But I create only one stream. Is that maybe better to create a strem for ingress and egress processing ?

suikast42 avatar Dec 06 '24 10:12 suikast42

@suikast42

Is it possible that you put together a project in a repo and I can try running your code? I've read the thread and looked at the consume code, here are some initial notes I have...


Especially when messages are published and consumed at the same time, consumers usually cannot read messages or rarely read messages.

Are you publishing and consuming on the same connection? They would share an outbound and inbound queue which would slow things down. Just some tuning to do, nothing major here. If you have multiple consumers, you might even consider having different connections for them. One thing to consider, is that if you have multiple consumers on the same connection, if they share a dispatcher, all incoming messages for all consumers are coming on the same thread and the same internal queue, so have to wait their turn, and you may experience some timeouts or small return lists in the fetch. See next comment.


List<Message> fetch = sub.fetch(batchsize, Duration.ofMillis(100));

This duration is too short for a fetch. This is probably 1.9 seconds less than the connection timeout. What ends up happening is that you send a pull request to the server with a 100ms timeout. To account for latency and network, that fetch call adds 10ms just to cover the cost of publishing the pull request and the server responding. Even at 110ms, you are just constantly send the server lots of pull requests. You are asking for 200 messages every couple hundred millis against a busy server that has lots of publishing, lot's of consuming, acking, in process, etc. The server certainly can handle a lot, so there is just some tuning to be done here. You also said this is basically local - so consider if your are in the cloud and clustered and there is network latency. Stuff to plan for.


message.inProgress();

This inProgress publishes (fires and forgets) a message to the server, so this is additional load on top of all the other publishing and consuming you are doing. And you are doing this for every messages you are getting. Just giving the server extra work, also on the same connection you are consuming. You probably did this because you realized that a fetch is a block of messages and the ack timer is ticking on every message in that block by the time you get the list from fetch to start processing them. I would consider an endless (Simplified) Consume. I might also put the messages in a linked blocking queue as I get them from the handler and let some other thread do the actual processing, I would tune the ack wait to be maybe double the time it takes to process the message which will account for all the overhead and just process and ack the message.

scottf avatar Dec 06 '24 17:12 scottf

Is it possible that you put together a project in a repo and I can try running your code? I've read the thread and looked at the consume code, here are some initial notes I have...

Pity not. That is not possible, beacuse of company policy.

Are you publishing and consuming on the same connection? Yes. All the stream push and pull based on the same connection

you might even consider having different connections

The ingress subjects have determnistic partioning for parallelizing the process of device messages. I assume that's more efficent to do it over the same connection.

This duration is too short for a fetch

I havbe tried it with 1s too. But nothing chnaged. The batches comes every 7 - 10s.

The slowdown comes after 5 - 7 days.

image

After slowdown I commented out all the egress process logic and ack only the batch. But nothing changed until I delete the stream and create it again ( that means of course lost of streaming data )

This is probably 1.9 seconds less than the connection timeout. Thtat is how I establish the connection to nats

    public void connect(String[] natsAddresses, String appname) {
        if ((natsConnection == null) || (natsConnection.getStatus() == Connection.Status.DISCONNECTED)) {

            Options.Builder ncOptions = new Options.Builder()
                    .servers(natsAddresses)
                    .connectionName(appname + "_connection")
                    .connectionTimeout(Duration.ofSeconds(10))
                    .connectionListener((connection, events) ->
                            logger.info("Connection changed " + connection.getConnectedUrl() + " -> " + events))

                    .errorListener(new NatsErrorListener(this));

            try {
                natsConnection = Nats.connect(ncOptions.build());
            } catch (IOException | InterruptedException ex) {
                logger.error("Can't create nats connection", ex);
                exit();
            }
        }

    }

You are asking for 200 messages every couple hundred millis against a busy server that has lots of publishing, lot's of consuming, acking, in process, etc. The server certainly can handle a lot, so there is just some tuning to be done here.

My purpose to doing that is to avoid a redelivery during the process of messages. It is unessary ? Or is a better approach to mark only the last message as in progress ?

I would consider an endless (Simplified) Consume. I might also put the messages in a linked blocking queue as I get them from the handler and let some other thread do the actual processing, I would tune the ack wait to be maybe double the time it takes to process the message which will account for all the overhead and just process and ack the message.

I don't got it exactly. The logic behind that do a JDBC batch insert. This takes 100 - 200 ms . ( With full batch of 200 messages )

suikast42 avatar Dec 07 '24 11:12 suikast42

The ingress subjects have determnistic partioning for parallelizing the process of device messages. I assume that's more efficent to do it over the same connection.

I'd still suggest multiple connections, it will be better for the client. Once you have a cluster, this will also spread the load among servers since multiple connections will likely be to different servers.

You said this whole process takes multiple days to appear. I suppose there is some sort of "leak", but the latest client versions have been tested for leaks and seem to be fine. Have you looked at the memory profile to ensure that your code isn't leaking by holding objects or threads?

My purpose to doing that is to avoid a redelivery during the process of messages. It is unessary ? Or is a better approach to mark only the last message as in progress ?

I understand that. As long as your ack wait is long enough, that won't be a problem. You just have to account for that in your fetch. The whole inProgress call is just extra load on your server and your current ack wait of 45 seconds is more than enough to handle a batch where your fetch timeout is even a couple seconds, especially when you say putting them in the DB takes less than a second.

I don't got it exactly. The logic behind that do a JDBC batch insert. This takes 100 - 200 ms . ( With full batch of 200 messages )

Consuming seems a less complex way to get and then process messages. Currently, you fetch a batch, then map through them and check them. Multiple loops of processing. Instead, consume feeds you a message at a time. You can compose your own batch, maybe by queuing them to be processed on a different thread. That thread can have whatever logic you need to make your storing to a database efficient and you can tune that independent of the process of getting the messages.

Bottom line here, a larger example would be more useful. This feels like it needs an architecture review, Synadia can provide that type of support.

scottf avatar Dec 07 '24 13:12 scottf

You said this whole process takes multiple days to appear. I suppose there is some sort of "leak", but the latest client versions have been tested for leaks and seem to be fine. Have you looked at the memory profile to ensure that your code isn't leaking by holding objects or threads?

I dont thnk so. I have restart the jestream app. The nats server. After that The ubuntu machine. With the same result.

I'd still suggest multiple connections, it will be better for the client. Once you have a cluster, this will also spread the load among servers since multiple connections will likely be to different servers.

I have serveral jetstream apps. Mohst of them have 10 ingress partitions and 5 egress partitions. Your suggestions is to have 15 connecttions right ?

That thread can have whatever logic you need to make your storing to a database efficient and you can tune that independent of the process of getting the messages.

Ok I got it. Buth there is another concern of to avoid reprocess the same message. I am acking the last message only iof the DB transaction commits.

Bottom line here, a larger example would be more useful. This feels like it needs an architecture review, Synadia can provide that type of support.

I will try to extract the technical part of the nats management logic. Does Synadia provide a traning ( online or somewhere in germany ? ) I think a review of the exisiting appllication makes more sense.

Thanks for your effort

suikast42 avatar Dec 07 '24 13:12 suikast42