nats-server
nats-server copied to clipboard
Subscribe parallel multiple consumers with same durable without receiving error (SUB-90012) [v2.10.1]
What version were you using?
Server: 2.10.1 Client: jnats 2.16.14
What environment was the server running in?
Xubuntu 22.04.3 6.2.0-33-generic x86_64
Is this defect reproducible?
When making a sleep (eg. 500ms) between subscriptions, i receive the expected error "[SUB-90012] Consumer is already bound to a subscription.". But without sleep, all durable consumers are receiving the same messages. TestCode:
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import io.nats.client.Connection;
import io.nats.client.Dispatcher;
import io.nats.client.JetStream;
import io.nats.client.JetStreamApiException;
import io.nats.client.JetStreamManagement;
import io.nats.client.JetStreamSubscription;
import io.nats.client.MessageHandler;
import io.nats.client.Nats;
import io.nats.client.Options;
import io.nats.client.PushSubscribeOptions;
import io.nats.client.api.ConsumerConfiguration;
import io.nats.client.api.StorageType;
import io.nats.client.api.StreamConfiguration;
import io.nats.client.api.StreamInfo;
import io.nats.client.impl.NatsMessage;
public class NatsParallelSubscribe {
private final static String NATS_SERVER = "nats://localhost:4222";
private final static String NATS_TOKEN = "KxX1FqaJ";
private final static int SLEEP_BETWEEN_SUBSCRIBE = 0;
private final static String STREAM_NAME = "NatsTimeout";
private final static String TEST_SUBJECT = "test.v1.";
private final static String[] STREAM_SUBJECTS = new String[] { TEST_SUBJECT + ">", "test.v2.>", "test.v3.>" };
private final Connection nastCon = getConnection();
private final JetStream jetStream = nastCon.jetStream();
private final Dispatcher dispatcher = nastCon.createDispatcher();
private final List<JetStreamSubscription> subscriptions = new ArrayList<>();
public NatsParallelSubscribe() throws IOException {
}
public static void main(String[] args) throws IOException, JetStreamApiException, InterruptedException {
NatsParallelSubscribe nt = new NatsParallelSubscribe();
nt.createStream();
nt.parallelSubscribe();
nt.sendMessages();
nt.cleanup();
}
private void cleanup() throws InterruptedException {
subscriptions.forEach(s -> dispatcher.unsubscribe(s));
subscriptions.clear();
nastCon.close();
}
private void sendMessages() throws IOException, JetStreamApiException, InterruptedException {
System.out.printf("Start send messages\n");
for (int i = 0; i < 5; i++) {
String msg = "testmsg " + i;
NatsMessage natsMessage = NatsMessage.builder().subject(TEST_SUBJECT + "XYZ").data(msg).build();
System.out.printf("Publish msg [%s]\n", msg);
jetStream.publish(natsMessage);
Thread.sleep(500);
}
}
private void parallelSubscribe() throws IOException, JetStreamApiException, InterruptedException {
final int streams = 3;
CountDownLatch cdl = new CountDownLatch(streams);
for (int i = 0; i < streams; i++) {
final int c = i;
new Thread(() -> {
try {
JetStreamSubscription subscription = jetStream.subscribe(TEST_SUBJECT + "*", dispatcher,
genMessage(c), false, PushSubscribeOptions.builder()
.configuration(ConsumerConfiguration.builder().durable("dur").build()).build());
subscriptions.add(subscription);
cdl.countDown();
System.out.printf("Stream [%d] established. Durable 'dur'\n", c);
} catch (Exception e) {
// Expect "consumer name already in use [10013]" on second subscribe
e.printStackTrace();
}
}).start();
Thread.sleep(SLEEP_BETWEEN_SUBSCRIBE);
}
if (!cdl.await(3, TimeUnit.SECONDS)) {
System.err.printf("Error on subscribe\n");
System.exit(1);
}
}
private void createStream() throws IOException, JetStreamApiException {
JetStreamManagement jsm = nastCon.jetStreamManagement();
List<StreamInfo> currentStreams = jsm.getStreams();
Optional<StreamInfo> oldStream = currentStreams.stream()
.filter(si -> si.getConfiguration().getName().equals(STREAM_NAME)).findFirst();
if (oldStream.isPresent()) {
System.out.printf("use existing stream\n");
} else {
StreamInfo info = jsm.addStream(StreamConfiguration.builder().name(STREAM_NAME)
.storageType(StorageType.Memory).subjects(STREAM_SUBJECTS).description("Test Stream").build());
System.out.printf("stream created: %s\n", info);
}
}
private Connection getConnection() {
io.nats.client.Options.Builder builder = new Options.Builder().server(NATS_SERVER)
.token(NATS_TOKEN.toCharArray());
try {
return Nats.connect(builder.build());
} catch (IllegalStateException | IOException | InterruptedException e) {
throw new RuntimeException(e);
}
}
private MessageHandler genMessage(int streamId) {
return msg -> {
if (msg != null) {
if (msg.isJetStream()) {
System.out.printf("Msg from stream [%d] received [%s]\n", streamId, new String(msg.getData()));
msg.ack();
}
}
};
}
}
Output:
Stream [0] established. Durable 'dur'
Stream [1] established. Durable 'dur'
Stream [2] established. Durable 'dur'
Start send messages
Publish msg [testmsg 0]
Msg from stream [1] received [testmsg 0]
Msg from stream [2] received [testmsg 0]
Msg from stream [0] received [testmsg 0]
Publish msg [testmsg 1]
Msg from stream [1] received [testmsg 1]
Msg from stream [2] received [testmsg 1]
Msg from stream [0] received [testmsg 1]
Publish msg [testmsg 2]
Msg from stream [1] received [testmsg 2]
Msg from stream [2] received [testmsg 2]
Msg from stream [0] received [testmsg 2]
Publish msg [testmsg 3]
Msg from stream [1] received [testmsg 3]
Msg from stream [2] received [testmsg 3]
Msg from stream [0] received [testmsg 3]
Publish msg [testmsg 4]
Msg from stream [1] received [testmsg 4]
Msg from stream [2] received [testmsg 4]
Msg from stream [0] received [testmsg 4]
Given the capability you are leveraging, describe your expectation?
Expect "consumer name already in use [10013]"-XPT when establish addidtional stream
Given the expectation, what is the defect you are observing?
Expect "consumer name already in use [10013]"-XPT when establish addidtional stream