nats.java
nats.java copied to clipboard
Timeout on parallel subscribe two consumers
Observed behavior
When parallel subscribe in different threads two consumer on local idle nats, i received sometimes an IOException:Timeout after two seconds.
Expected behavior
No timeout
Server and client version
Server: 2.10.1 Client: jnats 2.16.14
Host environment
Xubuntu 22.04.3 6.2.0-33-generic x86_64
Steps to reproduce
After a few iteration with my testcode:
Iteration: 0
Stream [1] established.
Stream [0] established.
Iteration: 1
Stream [0] established.
Stream [1] established.
Iteration: 2
Stream [0] established.
Stream [1] established.
Iteration: 3
Stream [1] established.
Stream [0] not established.
java.io.IOException: Timeout or no response waiting for NATS JetStream server
at io.nats.client.impl.NatsJetStreamImpl.responseRequired(NatsJetStreamImpl.java:216)
at io.nats.client.impl.NatsJetStreamImpl.makeRequestResponseRequired(NatsJetStreamImpl.java:200)
at io.nats.client.impl.NatsJetStreamImpl._getStreamNames(NatsJetStreamImpl.java:151)
at io.nats.client.impl.NatsJetStreamImpl.lookupStreamBySubject(NatsJetStreamImpl.java:191)
at io.nats.client.impl.NatsJetStream.createSubscription(NatsJetStream.java:301)
at io.nats.client.impl.NatsJetStream.subscribe(NatsJetStream.java:563)
at NatsTimeoutOnParallelSubscribe.lambda$0(NatsTimeoutOnParallelSubscribe.java:54)
at java.base/java.lang.Thread.run(Thread.java:833)
Error on subscribe
After some debugging the problem is an timout in https://github.com/nats-io/nats.java/blob/2.16.14/src/main/java/io/nats/client/impl/NatsConnection.java#L1098
I think in the block https://github.com/nats-io/nats.java/blob/2.16.14/src/main/java/io/nats/client/impl/NatsConnection.java#L1161 seems to be exists an multithreading-problem
Pragmatic way: After synchronize the part, the timeout-xpt is gone:
synchronized (this){
if (inboxDispatcher.get() == null) {
NatsDispatcher d = new NatsDispatcher(this, this::deliverReply);
if (inboxDispatcher.compareAndSet(null, d)) {
String id = this.nuid.next();
this.dispatchers.put(id, d);
d.start(id);
d.subscribe(this.mainInbox);
}
}
}
Testcode:
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import io.nats.client.Connection;
import io.nats.client.JetStream;
import io.nats.client.JetStreamApiException;
import io.nats.client.JetStreamManagement;
import io.nats.client.Nats;
import io.nats.client.Options;
import io.nats.client.api.StorageType;
import io.nats.client.api.StreamConfiguration;
import io.nats.client.api.StreamInfo;
public class NatsTimeoutOnParallelSubscribe {
private final static String NATS_SERVER = "nats://localhost:4222";
private final static String NATS_TOKEN = "KxX1FqaJ";
private final static String STREAM_NAME = "NatsTimeout";
private final static String STREAM_SUBJECT = "test";
public static void main(String[] args) throws IOException, JetStreamApiException, InterruptedException {
NatsTimeoutOnParallelSubscribe nt = new NatsTimeoutOnParallelSubscribe();
nt.createStream();
int i = 0;
while (true) {
System.out.printf("Iteration: %d\n", i++);
nt.parallelSubscribe();
Thread.sleep(1000);
}
}
private void parallelSubscribe() throws IOException, JetStreamApiException, InterruptedException {
final Connection nastCon = getConnection();
final JetStream jetStream = nastCon.jetStream();
final int streams = 2;
CountDownLatch cdl = new CountDownLatch(streams);
for (int i = 0; i < streams; i++) {
final int c = i;
new Thread(() -> {
try {
jetStream.subscribe(STREAM_SUBJECT);
cdl.countDown();
System.out.printf("Stream [%d] established.\n", c);
} catch (Exception e) {
System.err.printf("Stream [%d] not established.\n", c);
e.printStackTrace();
}
}).start();
}
if (!cdl.await(3, TimeUnit.SECONDS)) {
System.err.printf("Error on subscribe\n");
System.exit(1);
}
nastCon.close();
}
private void createStream() throws IOException, JetStreamApiException, InterruptedException {
final Connection nastCon = getConnection();
JetStreamManagement jsm = nastCon.jetStreamManagement();
List<StreamInfo> currentStreams = jsm.getStreams();
Optional<StreamInfo> oldStream = currentStreams.stream()
.filter(si -> si.getConfiguration().getName().equals(STREAM_NAME)).findFirst();
if (oldStream.isPresent()) {
System.out.printf("use existing stream\n");
} else {
StreamInfo info = jsm.addStream(StreamConfiguration.builder().name(STREAM_NAME)
.storageType(StorageType.Memory).subjects(STREAM_SUBJECT).description("Test Stream").build());
System.out.printf("stream created: %s\n", info);
}
nastCon.close();
}
private Connection getConnection() {
io.nats.client.Options.Builder builder = new Options.Builder().server(NATS_SERVER)
.token(NATS_TOKEN.toCharArray());
try {
return Nats.connect(builder.build());
} catch (IllegalStateException | IOException | InterruptedException e) {
throw new RuntimeException(e);
}
}
}
Is this repeatable against a local, non-cluster server? This seems unusual, the subscribe request reply should take milliseconds. not accounting for things like network latency, server load.
Yes, this issue comes from my local (non-cluster) machine. I think this is not a server side problem, but rather a multithreading problem on clientside. With the synchronize block the issue is gone.
I'm very concerned about putting a synchronize block around that code since it does not explain why there is a timeout, even if the problem appears to go away when you sync it. If there is an issue, I think there is another reason to be figured out. Will have to look into this.
@philmu I cannot reproduce this. I've been running your example for about a half hour. Is this still occurring? I wonder if this could be related to the operating system the program is running on, I'm running on windows.
@scottf I have tested again and since 2.17.5 the problem is gone as somebody implemented (nearly: synchronize vs reentrantlock) my suggested solution: https://github.com/nats-io/nats.java/compare/2.17.4...2.17.5#diff-bbcf614ee7ee4ccd15b22d1f95fa40e4e642098386b6d1d14127dab5968d89feR1191
The problem is solved and you can close the issue.