nats.java icon indicating copy to clipboard operation
nats.java copied to clipboard

Race condition when using pull subscription and draining connection

Open ewirch opened this issue 2 years ago • 3 comments

Defect

When using pull subscriptions (io.nats.client.Subscription#nextMessage(java.time.Duration)), and io.nats.client.Connection#drain for closing subscriptions/connection, a race condition occurs, which leads to a exception:

Exception in thread "Thread-1" java.lang.IllegalStateException: This subscription became inactive.
	at io.nats.client.impl.NatsSubscription.nextMessageInternal(NatsSubscription.java:181)
	at io.nats.client.impl.NatsJetStreamSubscription.nextMessageWithEndTime(NatsJetStreamSubscription.java:118)
	at io.nats.client.impl.NatsJetStreamSubscription.nextMessage(NatsJetStreamSubscription.java:89)

Versions of io.nats:jnats and nats-server:

io.nats:jnats:2.13.2 Server version: 2.7.1

OS/Container environment:

Docker using https://hub.docker.com/_/nats image.

Steps or code to reproduce the issue:


public class App {
    public static void main(String[] args) throws IOException, InterruptedException, JetStreamApiException, TimeoutException, ExecutionException {
        var connectionOptions = new Options.Builder()
            .server("nats://localhost:4222")
            .errorListener(new ErrorListenerLoggerImpl())
            .noNoResponders()
            .build();
        var connection = Nats.connect(connectionOptions);
        createStreamIfNotExists(connection, "test");
        System.out.println("Server version: " + connection.getServerInfo().getVersion());

        var consumerConfiguration = ConsumerConfiguration.builder()
            .build();
        var subscriptionOptions = PullSubscribeOptions.builder()
            .configuration(consumerConfiguration)
            .durable("test")
            .stream("test")
            .build();
        var subscription = connection.jetStream().subscribe("test", subscriptionOptions);

        var latch = new CountDownLatch(1);
        subscription.pull(10);
        var t = new Thread(() -> {
            try {
                latch.countDown();
                subscription.nextMessage(Duration.ofSeconds(10));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        t.start();

        latch.await();
        // give worker thread a chance to enter nextMessage() and block.
        Thread.sleep(100);

        connection.drain(Duration.ofSeconds(1))
            .get();
        t.join();
    }


    private static void createStreamIfNotExists(Connection connection, String stream) throws JetStreamApiException, IOException {
        var jetStreamManagement = connection.jetStreamManagement();

        if (!streamExists(jetStreamManagement, stream)) {
            createNewStream(jetStreamManagement, stream);
        }
    }

    private static boolean streamExists(JetStreamManagement jetStreamManagement, String streamName) throws IOException, JetStreamApiException {
        try {
            return jetStreamManagement.getStreamInfo(streamName) != null;
        } catch (JetStreamApiException ex) {
            if (ex.getErrorCode() == 404) {
                return false;
            }
            throw ex;
        }
    }

    private static void createNewStream(JetStreamManagement jetStreamManagement, String stream) throws JetStreamApiException, IOException {
        var streamConfiguration = StreamConfiguration.builder()
            .name(stream)
            .storageType(StorageType.File)
            .retentionPolicy(RetentionPolicy.WorkQueue)
            .build();
        jetStreamManagement.addStream(streamConfiguration);
    }
}

Expected result:

Shuts down gracefully.

Actual result:

The exception mentioned above is printed.

Analysis

When io.nats.client.impl.NatsConnection#drain executes, it eventually runs:

consumers.forEach(NatsConsumer::markUnsubedForDrain);

which starts draining process for subscribers. This is done by adding a poison pill to the queue. NatsConnection continues trying to wait until subscribers finished draining:

while (timeout == null || timeout.equals(Duration.ZERO)
       || Duration.between(start, now).compareTo(timeout) < 0) {
  consumers.removeIf(NatsConsumer::isDrained);
  if (consumers.size() == 0) {
    break;

and then, when consumers are empty, closing connection:

this.close(false); // close the connection after the last flush

The problem is, NatsConsumer::isDrained immediately returns true, because isDrained() is implemented as getPendingMessageCount() == 0 and the poison pill does not count against queue length. So the execution immediately continues to close the connection, which invalidates subscribers. So, when the subscriber thread eventually wakes up on the queue, the subscriber was already invalidated, and this code throws:

if (this.incoming == null || !this.incoming.isRunning()) { // We were unsubscribed while waiting
     throw new IllegalStateException("This subscription became inactive.");
}

ewirch avatar Feb 24 '22 17:02 ewirch

First of all, thanks for the detailed bug report, it's very much appreciated. I've got the code running and I cannot reproduce the problem. Can you describe your server / cluster. As with timing issue, I'm going probably have to have a closer server setup to what you have. And anything else you can think of.

scottf avatar Feb 24 '22 20:02 scottf

I run a single NATS node locally using Docker on Linux. Here is the Docker compose snippet for that:

version: "3.9"

services:
  nats:
    image: nats:2.7.2-alpine
    hostname: nats
    restart: on-failure
    ports:
      - "8222:8222"
      - "4222:4222"
    command: "--js -m 8222 --sd /data --name nats"
    healthcheck:
      test: ["CMD", "wget", "-S", "--spider", "http://0.0.0.0:8222/varz"]
      start_period: 30s
    volumes:
      - ${DATA_DIR}/message/nats:/data
    deploy:
      resources:
        limits:
          cpus: '1.0'
          memory: '2G'

networks:
  default:
    name: sh-proxy-network

The data dir is on a SSD, if this is of any relevance. I use Gradle for the project, but it makes no difference if I run it using gradle run or plain java, the exception occurs in any way. Each time.

ewirch avatar Feb 25 '22 06:02 ewirch

BTW, the same happens for io.nats.client.impl.NatsSubscription#drain, because it contains the same logic:

  • wait for isDrained()
  • immediately invalidate itself: cleanUpAfterDrain(): this.connection.invalidate(this);.

ewirch avatar Feb 25 '22 09:02 ewirch

@ewirch Couple things. NatsJetStreamSubscription just extends NatsSubscription so yes, it's the same exact code.

And even though I can't reproduce it, I 100% believe it happens. Maybe there is enough lag or whatever in docker that I can't get directly on my pc.

So I got to thinking about this exception and it is actually just reporting the state, that you to try to get the next message when the sub is in a state that it's not allowed to ask for messages. This seems intentional.

I also just wanted to note that I don't think it's specific to the pull subscription.

I'm closing this. We could reopen as a discussion if you want to go through your use case. And for pull, I don't think you would even care to drain. You might do better using some expiration on the pull so you can check for a quit condition regularly. And if you catch the exception and handle it, you know exactly the state you are in, which you probably meant to quit the pull anyway.

scottf avatar Jan 30 '23 22:01 scottf