nats.java icon indicating copy to clipboard operation
nats.java copied to clipboard

maxPingOut effectively limits the number of concurrent flush calls. This is not the case in other clients

Open matthiashanel opened this issue 4 years ago • 5 comments

flush calls sendPing(true) which causes the limit check. In other clients, maxPingOut only limits the number of pings without a pong as sent due to the pingInterval. There pingInterval * maxPingOut defines the duration after which the connection becomes stale. Her it also limits how many parallel calls to flush are allowed.

https://github.com/nats-io/nats.java/blob/7f7fe824422b6966c8147bc5f028f228be4ed6ad/src/main/java/io/nats/client/impl/NatsConnection.java#L1189-L1192

matthiashanel avatar Jan 29 '20 23:01 matthiashanel

While the go client is considered the gold standard - i don't see how this is logically wrong :-) If flush uses a ping, shouldn't flushes be limited the same as other pings?

sasbury avatar Jul 26 '20 22:07 sasbury

The connection timeout is essentially maxPingOut * pingInterval. If I have more threads concurrently calling flush than what I need to set maxPingOut to to get a desired connection timeout, I'm out of luck. Currently you'd have to set maxPingOut to how many threads you have that could call Flush concurrently. While I wouldn't say it's wrong to limit flush calls, maybe have a different value.

matthiashanel avatar Jul 28 '20 21:07 matthiashanel

@sasbury This doesn't seem right: https://github.com/nats-io/nats.java/blob/main/src/main/java/io/nats/client/impl/NatsConnection.java#L1275

handleCommunicationIssue closes the socket. I can definitely see throwing the IllegalStateException here but closing the socket b/c it was asked to ping and we're over max doesn't not seem right.

Also, I'm wondering if maybe on flushes, if there is something in the pong queue already and we are over maxPingsOut, then just attach to the last future in the pong queue? If there is a legit connection issue, then it's going to timeout anyway. If it's slow then not sure.

scottf avatar Nov 16 '22 13:11 scottf

I got into the same issue, which is very unpleasant in my case.

Case: my service tries to unsubscribe from multiple subjects in runtime simultaneously Subscribtion#drain calls NatsConnection#flush and results in the behavior described in the first message.

I could contribute to the fix, but I want to discuss how to do it. I'm not sure that I understand what is the case of using a queue for pongs future (and max ping limit as well).

I see the comment in the code which says

// Send a ping request and push a pong future on the queue. // futures are completed in order, keep this one if a thread wants to wait // for a specific pong. Note, if no pong returns the wait will not return // without setting a timeout.

But it's not clear to me why one wants to wait for a specific pong.

How about replacing the queue with an atomic reference and reusing a completable future instance?

// Define pong reference
private final AtomicReference<CompletableFuture<Boolean>> pongReference;

// Use it as follows
CompletableFuture<Boolean> pongFuture =
            pongReference.updateAndGet(existing -> existing == null ? new CompletableFuture<>() : existing);

In that case, to control connection liveliness, we can move the handleCommunicationIssue logic from the ping method to the ping timer scheduler:

Replace: https://github.com/nats-io/nats.java/blob/73c6039faa957405a62da77baa17867ab608ca74/src/main/java/io/nats/client/impl/NatsConnection.java#L471-L480

with

if (isConnected()) {
    try {
        // The timer always uses the standard queue
        softPing().get(pingMillis, TimeUnit.MILLISECONDS);
    } catch (TimeoutException timeoutException) {
        handleCommunicationIssue(timeoutException);
    } catch (ExecutionException | InterruptedException e) {
        handleCommunicationIssue(new TimeoutException(e.getMessage()));
    }
}

@sasbury @scottf, what do you think?

p.s. I'm not sure that my proposal is correct. I see under the handleCommunicationIssue method comment that says // Called from reader/writer thread. Timer thread, neither a reader nor writer thread.

Sorry if this is completely wrong. I don't have enough context in the code base.

ajax-semenov-y avatar Apr 06 '23 13:04 ajax-semenov-y

For now, you might consider using multiple connections. I'll look into this when I have some time.

scottf avatar Apr 06 '23 14:04 scottf