nats.rs icon indicating copy to clipboard operation
nats.rs copied to clipboard

JS Consumer lost after disconnect/reconnect (0.21)

Open bbigras opened this issue 2 years ago • 18 comments

It looks like #224, but I guess with a new api (since the old issue was closed).

  • [x] Included below version and environment information
  • [x] Included a [Minimal, Complete, and Verifiable example] (https://stackoverflow.com/help/mcve)

NATS version (grep 'name = "nats"' Cargo.lock -A 1)

name = "nats" version = "0.21.0"

rustc version (rustc --version - we support Rust 1.41 and up)

rustc 1.61.0

OS/Container environment:

nixos

Steps or code to reproduce the issue:

  • clone https://github.com/bbigras/test-nats-reconnect.git
  • start nats docker run -it --rm -p 4222:4222 -p 6222:6222 -p 8222:8222 -p 9922:9922 nats --js
  • create stream nats stream add --server=127.0.0.1 test_stream --subjects=test_subject --storage=memory --retention=limits --replicas=1 --discard=old --max-msgs=-1 --max-msgs-per-subject=1 --max-bytes=-1 --max-age=-1 --max-msg-size=-1 --dupe-window="2m0s" --no-allow-rollup --no-deny-delete --no-deny-purge
  • start a pub loop:
for index in {1..9999}
do
  echo "${index}"
  nats pub test_subject --server=127.0.0.1 "${index}" > /dev/null 2>&1
  sleep 3
done
  • start test case: cargo run
  • restart nats

Expected result:

test case would continue to get new messages

Actual result:

no new messages:

connected
subscribe
subscribed
[src/main.rs:21] &decoded_msg = "29"
[src/main.rs:21] &decoded_msg = "1"
[src/main.rs:21] &decoded_msg = "2"
[src/main.rs:21] &decoded_msg = "3"
[src/main.rs:21] &decoded_msg = "4"
[src/main.rs:21] &decoded_msg = "5"
connection has been lost


connection has been reestablished

bbigras avatar Jul 08 '22 15:07 bbigras

Thanks for the report @bbigras .

I will look into that soon.

Jarema avatar Jul 12 '22 19:07 Jarema

@Jarema friendly ping

bbigras avatar Aug 09 '22 14:08 bbigras

yes, looking into that right now.

Jarema avatar Aug 09 '22 15:08 Jarema

I don't know if this is related, but I am losing Consumers with the async_nats currently too (tested 0.18.0 and 0.17.0 so far).

After some time (different timings so far) I am losing the Consumers on a stream that was working for some time without any issues. I included the new callback syntax on the connection like

.event_callback(|event| async move {
            match event {
                Event::Reconnect => info!("Reconnected to Nats"),
                Event::Disconnect => warn!("Disconnected from Nats"),
                Event::LameDuckMode => warn!("Nats Server entered LameDuckMode"),
                Event::SlowConsumer(_) => warn!("SlowConsumer message from Nats Server"),
                Event::ServerError(err) => error!("Nats Server Error: {}", err),
                Event::ClientError(err) => error!("Nats Client Error: {}", err),
            }
        })

and I have never seen any message from that one so far. After some time, I just "silently" loose the Consumers. No disconnect, no message, it just stays in the .try_next() in the opened stream, which worked before. When I double check that with the nats cli tool and make a stream ls, I can see, that the Consumers are gone.

I am sorry, but I do not have any better / further information so far, testing is going on. But what I can say is that I tried a combination of loop {} with select! and a timeout, to kind of recreate the consumer.messages().await which did not help.

If this might be related, I can provide more information here if I can get some. Otherwise it would maybe be better to then open a new issue.

edit:

Polling the JetStream account info is still working when the Consumers get dropped. I spawned a task to do this regularly just to see if this still works or maybe even helps on keeping the connection open. Nevertheless, the account queries are still working (and it seems they keep on doing that) while the Consumers get dropped after some time.

edit 2:

I combined the loop { select! {..} } with fetching the Consumer info before restarting the loop:

info!("Consumer info: {:?}", consumer.clone().info().await);

And when the consumer "silently" goes away, I can receive an error in that case:

Consumer info: Err(Custom { kind: Other, error: "nats: error while getting consumer info: 10014, 404, consumer not found" })

After I received an error there, it stays like this until I fully restart everything. But I would not notice that at all without looking for the Err(_) at that point, because everything goes on as normal, just with the difference, that I will never receive any messages over the Stream anymore.

I hope that information helps a little bit.

sebadob avatar Aug 09 '22 18:08 sebadob

I am also seeing this behavior with async_nats but have not produced anything useful to track it down or report it with.

Simply restarting the service solves the problem and the consumer picks up at the proper location in the stream. I am now doing rolling restarts to just keep things alive / limit the downtime.

The problem occurs after 10,000's of messages, and not (apparently) at any uniform rate. I don't see memory issues either (but have only just started tracking this).

abalmos avatar Aug 09 '22 20:08 abalmos

I had 2 pods with the test app running last night. What works for now as a not_so_efficient_workaround is the following (stripped down version for the example):

async fn listen_stream(stream: stream::Stream) {
    loop {
        let cfg = consumer::pull::Config {
            ..Default::default()
        };
        let consumer = stream
            .get_or_create_consumer("my_consumer", cfg)
            .await
            .map_err(|err| error!("Error creating the consumer: {}", err))
            .unwrap();

        match consumer.messages().await {
            Ok(mut s) => {
                loop {
                    select! {
                        res = s.try_next() => {
                            if let Ok(Some(msg)) = res {
                                // do something useful
                            }
                        },
                        _ = tokio::time::sleep(CONSUMER_TIMEOUT) => {
                            match consumer.clone().info().await {
                                Ok(info) => debug!("Consumer info: {:?}", info),
                                Err(err) => {
                                    error!("Error retrieving Consumer info: {}", err);
                                    break;
                                }
                            }
                        },
                    }
                }
            }
            Err(err) => {
                error!("Error opening messages stream: {}", err);
            }
        };
    }
}

The stream is the stream created from the JetStream Context. This means I can re-use the js ctx as well as the stream which is created outside of this function.

CONSUMER_TIMEOUT is just a Duration which is currently set to 30 seconds.

Sometimes, the Consumer gets lost after 1 minute, sometimes after 3 hours. I cannot see any pattern here.

edit:

What I forgot to mention:
The problem seems not to be related to the amount of messages. I subscribed to 2 streams with both apps last night. One of them always had some messages while the other one was in idle the whole time without even a single message.

sebadob avatar Aug 10 '22 06:08 sebadob

We did series of optimisations and fixes to Pull Consumers.

Can you @sebadob retest your scenarios?

Jarema avatar Sep 03 '22 11:09 Jarema

Note that I think my test case from the first comment is invalid, since the stream is lost when I restart nats (as I'm using --storage=memory). I'll check my real use case, but I probably had the same issue or the file storage was lost when I restarted nats. I'll comment again if I still have issues.

bbigras avatar Sep 03 '22 13:09 bbigras

@Jarema I will implement v0.19 in my clients and have a look, if I still get into the situation, where I get an error in the select! branch where the consumer info error comes up. I will report back as soon as I have an answer.

Thank you very much for the really nice work on this project!

sebadob avatar Sep 03 '22 13:09 sebadob

First findings from my testing are, that it looks very promising. I left the select! branch in to see if my own error is logged or the new debug logs. I saw "my error" a few times in a on purpose unstable network, but I guess that these were just overlaps from the internal heartbeat checker.

Because I had too much other things to do today, I could not test everything I wanted, but as I said it looks good so far. Just one side note I would have is to add some information to the debug logging from the consumer and include the consumer name, subject, or something else to identify from which consumer the logs are coming. I am using 3 different ones in parallel and with the current debug logs I don't know from which one which logs are.

Apart from that, there are just a few typos I saw here and there saying hearbeat instead of hearTbeat.

As soon as I can do the other tests and maybe let it run over night or a few days, I will report back.

sebadob avatar Sep 05 '22 15:09 sebadob

I removed my manual select! branches for error checking, had all left over test cases running all day long and did not have any issues. The problem is solved with v0.19 in my case.

Thank you very much!

sebadob avatar Sep 06 '22 16:09 sebadob

Update - Not so good news:

I had all applications running for ~24hours now and when I wanted to do another round of quick testing this morning, the consumers were lost again, silently. I waited a bit to see if the internal heartbeat checker was maybe just about noticing it, but it did not happen.

I double checked the Nats cluster with the CLI and I saw, that the Consumers were dropped again. When yesterday evening there were 4 Consumers listed on the screen, this morning Nats CLI tool showed 0 again, since I removed my consumer checks in the select! branch.

sebadob avatar Sep 07 '22 08:09 sebadob

@sebadob There were fixes recently in ants-server for loosing ephemeral consumers in some cases.

Release 2.9 will bring them in. In the meantime - I suggest setting inactive_threshold on Consumer config to > 60 seconds. That should prevent such situations. I will also increase the default in library.

Jarema avatar Sep 07 '22 08:09 Jarema

Ahh okay, I did not know about the Nats issues. I will look forward to the new release.

What exactly happens, when the inactive_threshold will be exceeded? I will give it a try.

Thank you.

sebadob avatar Sep 07 '22 08:09 sebadob

If inactive threshold is exceeded, ephemeral consumer is deleted by the server. This issue would never happen if you'd use durable consumer.

Though Rust client code keeps the consumer on the server busy all the time, to be safe, pre 2.9 its good to have higher inactive_threshold.

Jarema avatar Sep 07 '22 09:09 Jarema

Perfect, thank you very much.

I am using ephemeral consumers, since I will have quite a few IoT clients which change dynamically. I will try the inactive_threshold.

sebadob avatar Sep 07 '22 09:09 sebadob

Update from my side:

With Nats-Server 2.9 + inactive_threshold + changing to a durable consumer solved all leftover issues for me. Thank you very much @Jarema

sebadob avatar Sep 17 '22 06:09 sebadob

@bbigras is it ok to close this issue now?

Jarema avatar Sep 20 '22 12:09 Jarema

Closing this out as it's gone stale without any additional comments and seems to have been resolved.

caspervonb avatar Apr 05 '23 18:04 caspervonb