nats-server icon indicating copy to clipboard operation
nats-server copied to clipboard

on JetStream ack timeout, ack_pending is expected to be decreased and num_pending to be increased

Open aricart opened this issue 3 years ago • 5 comments

On ack expiration, the ack_pending is not getting decreased, and num_pending is not getting re-increased.

Deno.test("manual - pull inflight acks expired", async () => {
  const sc = StringCodec();
  const ns = await NatsServer.start(serverOpts());
  const nc = await connect({ port: ns.port });
  const jsm = jsmClient(nc);

// create a stream that waits for acks for 500ms
  await jsm.streams.create("ORDERS", { subjects: ["orders.*"] });
  await jsm.consumers.create(
    "ORDERS",
    {
      durable_name: "NEW",
      filter_subject: "orders.new",
      ack_wait: nanos(500),
    },
  );
// add a message to the stream
  nc.publish("orders.new", sc.encode("1"));

// get info on the stream - we expect num_pending to be 1
  let ci = await jsm.consumers.info("ORDERS", "NEW");
  assertEquals(ci.num_pending, 1);

// make a request, but don't ack the message
  const m = await nc.request(
    "$JS.API.CONSUMER.MSG.NEXT.ORDERS.NEW",
    undefined,
    { timeout: 1000, noMux: true },
  );

// we get info, and verify that there's a pending ack
  ci = await jsm.consumers.info("ORDERS", "NEW");
  assertEquals(ci.num_ack_pending, 1);

// now we wait to have the ack expire
  await delay(2000);

// we get the info again - we expect the num_pending to be 1 as the ack has timed out by now
  ci = await jsm.consumers.info("ORDERS", "NEW");
  assertEquals(ci.num_pending, 1); <---- ZONK num_ack_pending is still one, and num_pending is 0

  const m2 = await nc.request(
    "$JS.API.CONSUMER.MSG.NEXT.ORDERS.NEW",
    undefined,
    { timeout: 1000, noMux: true },
  );
  m.respond(sc.encode("+ACK"));

  ci = await jsm.consumers.info("ORDERS", "NEW");
  assertEquals(ci.num_pending, 0);

  await nc.close();
  await ns.stop();
});

This is the output after the timeout for the ack has happened - the num_ack pending is expected to be zero and num_pending expected to be 1

test manual - pull inflight acks expired ... {
  type: "io.nats.jetstream.api.v1.consumer_info_response",
  stream_name: "ORDERS",
  name: "NEW",
  created: "2020-11-17T17:50:50.130592Z",
  config: {
    durable_name: "NEW",
    deliver_policy: "all",
    ack_policy: "explicit",
    ack_wait: 500000000,
    max_deliver: -1,
    filter_subject: "orders.new",
    replay_policy: "instant",
    max_waiting: 512
  },
  delivered: { consumer_seq: 1, stream_seq: 1 },
  ack_floor: { consumer_seq: 0, stream_seq: 0 },
  num_ack_pending: 1,
  num_redelivered: 0,
  num_waiting: 0,
  num_pending: 0
}
ok (2133ms)

aricart avatar Nov 17 '20 17:11 aricart

Here num_ack_pending isn't going down once the AckWait is finished that message is not in ack pending but it's still conting to the num_ack_pending count, that seems wrong.

{
  "stream_name": "TEST",
  "name": "TEST",
  "config": {
    "durable_name": "TEST",
    "deliver_policy": "all",
    "ack_policy": "explicit",
    "ack_wait": 30000000000,
    "max_deliver": 2,
    "replay_policy": "instant"
  },
  "created": "2020-11-17T17:58:52.150110203Z",
  "delivered": {
    "consumer_seq": 2,
    "stream_seq": 2
  },
  "ack_floor": {
    "consumer_seq": 0,
    "stream_seq": 0
  },
  "num_ack_pending": 2,
  "num_redelivered": 0,
  "num_waiting": 0,
  "num_pending": 7
}

ripienaar avatar Nov 17 '20 18:11 ripienaar

Hi, I just noticed the same behavior with 2.7.0.

kung-foo avatar Jan 20 '22 21:01 kung-foo

I believe that there is no issue. I have created a "server" test (posted below) that tries to cover all issues describe above and the test passes with v2.9.0 (main actually).

The thing that first tricked me (and maybe that is the same issue that people in this list faced) is that unlike a push consumer, the server does not attempt to deliver (and therefore detect as reaching max deliver) until it receives a pull request.

So a message that is delivered but not ack'ed is marked as "ack pending", but it will stay this way always until a new pull request comes. If there is no max_deliver, then it will always be "ack pending" until it is ack'ed. If there is a max_deliver, it will be ack_pending until the pull request that made the message go over the max_deliver limit, but then the new messages are actually delivered.

Please have a look at the test and let me know if that explains what you are/were seeing. If so, then server behaves correctly and I will close the issue.


func TestJetStreamPullConsumerAckAndNumPendingAfterAckTimeout(t *testing.T) {
	s := RunBasicJetStreamServer()
	if config := s.JetStreamConfig(); config != nil {
		defer removeDir(t, config.StoreDir)
	}
	defer s.Shutdown()

	nc, js := jsClientConnect(t, s)
	defer nc.Close()

	_, err := js.AddStream(&nats.StreamConfig{
		Name:     "ORDERS",
		Subjects: []string{"orders.*"},
	})
	require_NoError(t, err)

	_, err = js.AddConsumer("ORDERS", &nats.ConsumerConfig{
		Durable:       "NEW",
		FilterSubject: "orders.new",
		DeliverPolicy: nats.DeliverAllPolicy,
		AckWait:       100 * time.Millisecond,
		AckPolicy:     nats.AckExplicitPolicy,
		MaxDeliver:    2,
	})
	require_NoError(t, err)

	for i := 0; i < 10; i++ {
		sendStreamMsg(t, nc, "orders.new", fmt.Sprintf("%d", i+1))
	}

	const checkAck = true
	const checkPending = false

	check := func(ack bool, expected uint64) {
		t.Helper()
		checkFor(t, 2*time.Second, 50*time.Millisecond, func() error {
			ci, err := js.ConsumerInfo("ORDERS", "NEW")
			if err != nil {
				return err
			}
			if ack {
				if ci.NumAckPending != int(expected) {
					return fmt.Errorf("num_ack_pending should be %v, got %v", expected, ci.NumAckPending)
				}
			} else {
				if ci.NumPending != expected {
					return fmt.Errorf("num_pending should be %v, got %v", expected, ci.NumPending)
				}
			}
			return nil
		})
	}
	check(checkPending, 10)

	sub, err := js.PullSubscribe("orders.new", "NEW")
	require_NoError(t, err)

	getMsg := func(expected string, ack bool) {
		t.Helper()
		msgs, err := sub.Fetch(1, nats.MaxWait(time.Second))
		require_NoError(t, err)
		require_True(t, len(msgs) == 1)
		msg := msgs[0]
		require_Equal(t, string(msg.Data), expected)
		if ack {
			require_NoError(t, msg.AckSync())
		}
	}

	getMsg("1", false)
	getMsg("2", false)

	check(checkAck, 2)

	// Wait more than AckWait
	time.Sleep(700 * time.Millisecond)

	// Still pending.
	check(checkAck, 2)
	check(checkPending, 8)

	// Consume them again...
	getMsg("1", false)
	getMsg("2", false)

	// Wait more than AckWait
	time.Sleep(700 * time.Millisecond)

	// Now sending a new request will invalidate "1" and "2",
	// so we would get "3" and "4"
	getMsg("3", true)
	getMsg("4", true)

	check(checkAck, 0)
	check(checkPending, 6)

	for i := 0; i < 6; i++ {
		getMsg(fmt.Sprintf("%d", 5+i), true)
	}
}

kozlovic avatar Sep 14 '22 21:09 kozlovic

@kozlovic thanks! It's kinda what i figured.

kung-foo avatar Sep 15 '22 07:09 kung-foo

@aricart @ripienaar I understand that this is a very old report, but if you could check my explanation above (and the test I provided) to see if that explains the "unexpected" behavior you observed at the time? If that explains, and the server behaves as expected, then I will close this issue.

kozlovic avatar Sep 15 '22 14:09 kozlovic