nats.java icon indicating copy to clipboard operation
nats.java copied to clipboard

Timeout on parallel subscribe two consumers

Open philmu opened this issue 1 year ago • 3 comments

Observed behavior

When parallel subscribe in different threads two consumer on local idle nats, i received sometimes an IOException:Timeout after two seconds.

Expected behavior

No timeout

Server and client version

Server: 2.10.1 Client: jnats 2.16.14

Host environment

Xubuntu 22.04.3 6.2.0-33-generic x86_64

Steps to reproduce

After a few iteration with my testcode:

Iteration: 0
Stream [1] established.
Stream [0] established.
Iteration: 1
Stream [0] established.
Stream [1] established.
Iteration: 2
Stream [0] established.
Stream [1] established.
Iteration: 3
Stream [1] established.
Stream [0] not established.
java.io.IOException: Timeout or no response waiting for NATS JetStream server
	at io.nats.client.impl.NatsJetStreamImpl.responseRequired(NatsJetStreamImpl.java:216)
	at io.nats.client.impl.NatsJetStreamImpl.makeRequestResponseRequired(NatsJetStreamImpl.java:200)
	at io.nats.client.impl.NatsJetStreamImpl._getStreamNames(NatsJetStreamImpl.java:151)
	at io.nats.client.impl.NatsJetStreamImpl.lookupStreamBySubject(NatsJetStreamImpl.java:191)
	at io.nats.client.impl.NatsJetStream.createSubscription(NatsJetStream.java:301)
	at io.nats.client.impl.NatsJetStream.subscribe(NatsJetStream.java:563)
	at NatsTimeoutOnParallelSubscribe.lambda$0(NatsTimeoutOnParallelSubscribe.java:54)
	at java.base/java.lang.Thread.run(Thread.java:833)
Error on subscribe

After some debugging the problem is an timout in https://github.com/nats-io/nats.java/blob/2.16.14/src/main/java/io/nats/client/impl/NatsConnection.java#L1098

I think in the block https://github.com/nats-io/nats.java/blob/2.16.14/src/main/java/io/nats/client/impl/NatsConnection.java#L1161 seems to be exists an multithreading-problem

Pragmatic way: After synchronize the part, the timeout-xpt is gone:

synchronized (this){
        if (inboxDispatcher.get() == null) {
            NatsDispatcher d = new NatsDispatcher(this, this::deliverReply);

            if (inboxDispatcher.compareAndSet(null, d)) {
                String id = this.nuid.next();
                this.dispatchers.put(id, d);
                d.start(id);
                d.subscribe(this.mainInbox);
            }
        }
     }

Testcode:

import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import io.nats.client.Connection;
import io.nats.client.JetStream;
import io.nats.client.JetStreamApiException;
import io.nats.client.JetStreamManagement;
import io.nats.client.Nats;
import io.nats.client.Options;
import io.nats.client.api.StorageType;
import io.nats.client.api.StreamConfiguration;
import io.nats.client.api.StreamInfo;

public class NatsTimeoutOnParallelSubscribe {

    private final static String NATS_SERVER = "nats://localhost:4222";
    private final static String NATS_TOKEN = "KxX1FqaJ";

    private final static String STREAM_NAME = "NatsTimeout";
    private final static String STREAM_SUBJECT = "test";

    public static void main(String[] args) throws IOException, JetStreamApiException, InterruptedException {
	NatsTimeoutOnParallelSubscribe nt = new NatsTimeoutOnParallelSubscribe();
	nt.createStream();
	int i = 0;
	while (true) {
	    System.out.printf("Iteration: %d\n", i++);
	    nt.parallelSubscribe();
	    Thread.sleep(1000);
	}
    }

    private void parallelSubscribe() throws IOException, JetStreamApiException, InterruptedException {
	final Connection nastCon = getConnection();
	final JetStream jetStream = nastCon.jetStream();
	final int streams = 2;
	CountDownLatch cdl = new CountDownLatch(streams);

	for (int i = 0; i < streams; i++) {
	    final int c = i;
	    new Thread(() -> {
		try {
		    jetStream.subscribe(STREAM_SUBJECT);
		    cdl.countDown();
		    System.out.printf("Stream [%d] established.\n", c);
		} catch (Exception e) {
                    System.err.printf("Stream [%d] not established.\n", c);
		    e.printStackTrace();
		}
	    }).start();
	}
	if (!cdl.await(3, TimeUnit.SECONDS)) {
	    System.err.printf("Error on subscribe\n");
	    System.exit(1);
	}

	nastCon.close();
    }

    private void createStream() throws IOException, JetStreamApiException, InterruptedException {
	final Connection nastCon = getConnection();
	JetStreamManagement jsm = nastCon.jetStreamManagement();
	List<StreamInfo> currentStreams = jsm.getStreams();

	Optional<StreamInfo> oldStream = currentStreams.stream()
		.filter(si -> si.getConfiguration().getName().equals(STREAM_NAME)).findFirst();

	if (oldStream.isPresent()) {
	    System.out.printf("use existing stream\n");
	} else {
	    StreamInfo info = jsm.addStream(StreamConfiguration.builder().name(STREAM_NAME)
		    .storageType(StorageType.Memory).subjects(STREAM_SUBJECT).description("Test Stream").build());
	    System.out.printf("stream created: %s\n", info);
	}

	nastCon.close();
    }

    private Connection getConnection() {

	io.nats.client.Options.Builder builder = new Options.Builder().server(NATS_SERVER)
		.token(NATS_TOKEN.toCharArray());
	try {
	    return Nats.connect(builder.build());
	} catch (IllegalStateException | IOException | InterruptedException e) {
	    throw new RuntimeException(e);
	}
    }
}

philmu avatar Sep 29 '23 15:09 philmu

Is this repeatable against a local, non-cluster server? This seems unusual, the subscribe request reply should take milliseconds. not accounting for things like network latency, server load.

scottf avatar Sep 29 '23 18:09 scottf

Yes, this issue comes from my local (non-cluster) machine. I think this is not a server side problem, but rather a multithreading problem on clientside. With the synchronize block the issue is gone.

philmu avatar Sep 30 '23 08:09 philmu

I'm very concerned about putting a synchronize block around that code since it does not explain why there is a timeout, even if the problem appears to go away when you sync it. If there is an issue, I think there is another reason to be figured out. Will have to look into this.

scottf avatar Sep 30 '23 13:09 scottf

@philmu I cannot reproduce this. I've been running your example for about a half hour. Is this still occurring? I wonder if this could be related to the operating system the program is running on, I'm running on windows.

scottf avatar Jul 08 '24 20:07 scottf

@scottf I have tested again and since 2.17.5 the problem is gone as somebody implemented (nearly: synchronize vs reentrantlock) my suggested solution: https://github.com/nats-io/nats.java/compare/2.17.4...2.17.5#diff-bbcf614ee7ee4ccd15b22d1f95fa40e4e642098386b6d1d14127dab5968d89feR1191

The problem is solved and you can close the issue.

philmu avatar Aug 07 '24 12:08 philmu