nats-server icon indicating copy to clipboard operation
nats-server copied to clipboard

Subscribe parallel multiple consumers with same durable without receiving error (SUB-90012) [v2.10.1]

Open philmu opened this issue 1 year ago • 0 comments

What version were you using?

Server: 2.10.1 Client: jnats 2.16.14

What environment was the server running in?

Xubuntu 22.04.3 6.2.0-33-generic x86_64

Is this defect reproducible?

When making a sleep (eg. 500ms) between subscriptions, i receive the expected error "[SUB-90012] Consumer is already bound to a subscription.". But without sleep, all durable consumers are receiving the same messages. TestCode:

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import io.nats.client.Connection;
import io.nats.client.Dispatcher;
import io.nats.client.JetStream;
import io.nats.client.JetStreamApiException;
import io.nats.client.JetStreamManagement;
import io.nats.client.JetStreamSubscription;
import io.nats.client.MessageHandler;
import io.nats.client.Nats;
import io.nats.client.Options;
import io.nats.client.PushSubscribeOptions;
import io.nats.client.api.ConsumerConfiguration;
import io.nats.client.api.StorageType;
import io.nats.client.api.StreamConfiguration;
import io.nats.client.api.StreamInfo;
import io.nats.client.impl.NatsMessage;

public class NatsParallelSubscribe {

    private final static String NATS_SERVER = "nats://localhost:4222";
    private final static String NATS_TOKEN = "KxX1FqaJ";
    private final static int SLEEP_BETWEEN_SUBSCRIBE = 0;

    private final static String STREAM_NAME = "NatsTimeout";
    private final static String TEST_SUBJECT = "test.v1.";
    private final static String[] STREAM_SUBJECTS = new String[] { TEST_SUBJECT + ">", "test.v2.>", "test.v3.>" };
    private final Connection nastCon = getConnection();
    private final JetStream jetStream = nastCon.jetStream();
    private final Dispatcher dispatcher = nastCon.createDispatcher();
    private final List<JetStreamSubscription> subscriptions = new ArrayList<>();

    public NatsParallelSubscribe() throws IOException {
    }

    public static void main(String[] args) throws IOException, JetStreamApiException, InterruptedException {
	NatsParallelSubscribe nt = new NatsParallelSubscribe();
	nt.createStream();
	nt.parallelSubscribe();
	nt.sendMessages();
	nt.cleanup();

    }

    private void cleanup() throws InterruptedException {
	subscriptions.forEach(s -> dispatcher.unsubscribe(s));
	subscriptions.clear();
	nastCon.close();
    }

    private void sendMessages() throws IOException, JetStreamApiException, InterruptedException {
	System.out.printf("Start send messages\n");
	for (int i = 0; i < 5; i++) {

	    String msg = "testmsg " + i;
	    NatsMessage natsMessage = NatsMessage.builder().subject(TEST_SUBJECT + "XYZ").data(msg).build();
	    System.out.printf("Publish msg [%s]\n", msg);
	    jetStream.publish(natsMessage);
	    Thread.sleep(500);
	}
    }

    private void parallelSubscribe() throws IOException, JetStreamApiException, InterruptedException {
	final int streams = 3;
	CountDownLatch cdl = new CountDownLatch(streams);

	for (int i = 0; i < streams; i++) {
	    final int c = i;
	    new Thread(() -> {
		try {
		    JetStreamSubscription subscription = jetStream.subscribe(TEST_SUBJECT + "*", dispatcher,
			    genMessage(c), false, PushSubscribeOptions.builder()
				    .configuration(ConsumerConfiguration.builder().durable("dur").build()).build());
		    subscriptions.add(subscription);
		    cdl.countDown();
		    System.out.printf("Stream [%d] established. Durable 'dur'\n", c);
		} catch (Exception e) {
		    // Expect "consumer name already in use [10013]" on second subscribe
		    e.printStackTrace();
		}
	    }).start();
	    Thread.sleep(SLEEP_BETWEEN_SUBSCRIBE);
	}
	if (!cdl.await(3, TimeUnit.SECONDS)) {
	    System.err.printf("Error on subscribe\n");
	    System.exit(1);
	}
    }

    private void createStream() throws IOException, JetStreamApiException {
	JetStreamManagement jsm = nastCon.jetStreamManagement();
	List<StreamInfo> currentStreams = jsm.getStreams();

	Optional<StreamInfo> oldStream = currentStreams.stream()
		.filter(si -> si.getConfiguration().getName().equals(STREAM_NAME)).findFirst();

	if (oldStream.isPresent()) {
	    System.out.printf("use existing stream\n");
	} else {
	    StreamInfo info = jsm.addStream(StreamConfiguration.builder().name(STREAM_NAME)
		    .storageType(StorageType.Memory).subjects(STREAM_SUBJECTS).description("Test Stream").build());
	    System.out.printf("stream created: %s\n", info);
	}
    }

    private Connection getConnection() {

	io.nats.client.Options.Builder builder = new Options.Builder().server(NATS_SERVER)
		.token(NATS_TOKEN.toCharArray());
	try {
	    return Nats.connect(builder.build());
	} catch (IllegalStateException | IOException | InterruptedException e) {
	    throw new RuntimeException(e);
	}
    }

    private MessageHandler genMessage(int streamId) {
	return msg -> {
	    if (msg != null) {
		if (msg.isJetStream()) {
		    System.out.printf("Msg from stream [%d] received [%s]\n", streamId, new String(msg.getData()));
		    msg.ack();
		}
	    }
	};
    }
}

Output:

Stream [0] established. Durable 'dur'
Stream [1] established. Durable 'dur'
Stream [2] established. Durable 'dur'
Start send messages
Publish msg [testmsg 0]
Msg from stream [1] received [testmsg 0]
Msg from stream [2] received [testmsg 0]
Msg from stream [0] received [testmsg 0]
Publish msg [testmsg 1]
Msg from stream [1] received [testmsg 1]
Msg from stream [2] received [testmsg 1]
Msg from stream [0] received [testmsg 1]
Publish msg [testmsg 2]
Msg from stream [1] received [testmsg 2]
Msg from stream [2] received [testmsg 2]
Msg from stream [0] received [testmsg 2]
Publish msg [testmsg 3]
Msg from stream [1] received [testmsg 3]
Msg from stream [2] received [testmsg 3]
Msg from stream [0] received [testmsg 3]
Publish msg [testmsg 4]
Msg from stream [1] received [testmsg 4]
Msg from stream [2] received [testmsg 4]
Msg from stream [0] received [testmsg 4]

Given the capability you are leveraging, describe your expectation?

Expect "consumer name already in use [10013]"-XPT when establish addidtional stream

Given the expectation, what is the defect you are observing?

Expect "consumer name already in use [10013]"-XPT when establish addidtional stream

philmu avatar Sep 29 '23 11:09 philmu