nats-server icon indicating copy to clipboard operation
nats-server copied to clipboard

Sometime consumer doesn't get all messages from stream with limit retention and rollup subject

Open kino71 opened this issue 2 years ago • 8 comments

Versions of nats-server and affected client libraries used:

nats 2.8.4

OS/Container environment:

Linux RH8 on VXRail / VMWare

Steps or code to reproduce the issue:

make a stream with limit retention, allows rollup make a durable to keep interest publish a few messages with different subjects and rollup subscribe messages with an ephemeral consumer (natscli will do)

Expected result:

Every time get a message for each unique subject

Actual result:

Sometime I get all the expected messages, sometime only one (higher stream sequence)

For instance: STREAM:

{
  "config": {
    "name": "foo",
    "subjects": [
      "foo.\u003e"
    ],
    "retention": "interest",
    "max_consumers": -1,
    "max_msgs_per_subject": -1,
    "max_msgs": -1,
    "max_bytes": -1,
    "max_age": 0,
    "max_msg_size": -1,
    "storage": "file",
    "discard": "old",
    "num_replicas": 3,
    "duplicate_window": 120000000000,
    "sealed": false,
    "deny_delete": false,
    "deny_purge": false,
    "allow_rollup_hdrs": true
  }
}

define a consumer:

{
    "ack_policy": "explicit",
    "ack_wait": 30000000000,
    "deliver_policy": "all",
    "deliver_subject": "PUSH.foo.bar",
    "durable_name": "bar",
    "filter_subject": "foo.bar.\u003e",
    "max_ack_pending": 5,
    "max_deliver": 10,
    "replay_policy": "instant",
    "num_replicas": 0
}

Publish 3 messages:

nats pub foo.bar.1 "hello" -H "Nats-Rollup: sub"
nats pub foo.bar.2 "hello" -H "Nats-Rollup: sub"
nats pub foo.bar.3 "hello" -H "Nats-Rollup: sub"

Subscribe with an ephemeral using nats-cli :

nats sub "foo.bar.>" --all

21:25:20 Subscribing to JetStream Stream holding messages with subject foo.bar.> starting with the first message received
[#1] Received JetStream message: stream: foo seq 1 / subject: foo.bar.1 / time: 2022-06-22T21:24:43+02:00
Nats-Rollup: sub

hello

[#2] Received JetStream message: stream: foo seq 2 / subject: foo.bar.2 / time: 2022-06-22T21:24:46+02:00
Nats-Rollup: sub

hello

[#3] Received JetStream message: stream: foo seq 3 / subject: foo.bar.3 / time: 2022-06-22T21:24:50+02:00
Nats-Rollup: sub

hello

Try a few more time, eventually you'll get:

nats sub "foo.bar.>" --all

21:25:22 Subscribing to JetStream Stream holding messages with subject foo.bar.> starting with the first message received
[#1] Received JetStream message: stream: foo seq 3 / subject: foo.bar.3 / time: 2022-06-22T21:24:50+02:00
Nats-Rollup: sub

hello

kino71 avatar Jun 22 '22 19:06 kino71

Thanks and will make sure to take a look to make sure if there is a bug we get it addressed.

Curious though what is the upper level goal? I wonder if MaxMsgsPerSubject might be better in accomplishing what you need.

derekcollison avatar Jun 23 '22 00:06 derekcollison

we are interested into having only the last message published for any given subject (it's the most recent stock exchange's segment status). MaxMsgsPerSubject could be good for us, is there documentation about its behavior ? I'm having trouble finding it.

kino71 avatar Jun 23 '22 06:06 kino71

package server

import (
	"encoding/json"
	"fmt"
	"strconv"
	"testing"
	"time"

	"github.com/nats-io/nats.go"
)

func Test3209(t *testing.T) {
	c := createJetStreamClusterExplicit(t, "JSC", 3)
	defer c.shutdown()

	nc, js := jsClientConnect(t, c.randomServer())
	defer nc.Close()

	cfg := `{
    "name": "foo",
    "subjects": [
      "foo.\u003e"
    ],
    "retention": "interest",
    "max_consumers": -1,
    "max_msgs_per_subject": -1,
    "max_msgs": -1,
    "max_bytes": -1,
    "max_age": 0,
    "max_msg_size": -1,
    "storage": "file",
    "discard": "old",
    "num_replicas": 3,
    "duplicate_window": 120000000000,
    "sealed": false,
    "deny_delete": false,
    "deny_purge": false,
    "allow_rollup_hdrs": true
  }`
	scfg := &nats.StreamConfig{}
	err := json.Unmarshal([]byte(cfg), scfg)
	if err != nil {
		t.Fatalf("unmarshal failed: %v", err)
	}

	_, err = js.AddStream(scfg)
	if err != nil {
		t.Fatalf("add stream failed: %v", err)
	}

	// idle durable to prevent interest retention from removing messages
	_, err = js.AddConsumer(scfg.Name, &nats.ConsumerConfig{Durable: "x", AckPolicy: nats.AckExplicitPolicy})
	if err != nil {
		t.Fatalf("durable add failed: %v", err)
	}

	for i := 0; i < 4; i++ {
		msg := nats.NewMsg(fmt.Sprintf("foo.%d", i))
		msg.Data = []byte(strconv.Itoa(i))

		_, err = js.PublishMsg(msg)
		if err != nil {
			t.Fatalf("publish failed: %v", err)
		}

		msg.Header.Add(nats.MsgRollup, nats.MsgRollupSubject)

		_, err = js.PublishMsg(msg)
		if err != nil {
			t.Fatalf("publish failed: %v", err)
		}
	}

	for tc := 0; tc <= 100; tc++ {
		sub, err := js.SubscribeSync("foo.>", nats.DeliverAll())
		if err != nil {
			t.Fatalf("sub %d failed: %v", tc, err)
		}

		msg, err := sub.NextMsg(time.Second)
		if err != nil {
			t.Fatalf("next msg %d failed: %v", tc, err)
		}

		meta, err := msg.Metadata()
		if err != nil {
			t.Fatalf("metadata %d failed: %v", tc, err)
		}

		// here we always expect sequence 2 because we do 5 x pub->rollup, we should always get the first rollup which would
		// be stream sequence 2 as we do DeliverAll(), but sometimes we do not get early rollups only the last one
		if meta.Sequence.Stream != 2 {
			t.Fatalf("Test %d expected sequence 2 got %d: %#v: %q", tc, meta.Sequence.Stream, msg.Header, msg.Data)
		}

		sub.Unsubscribe()
	}
}
=== RUN   Test3209
    3209_test.go:94: Test 9 expected sequence 2 got 8: nats.Header{"Nats-Rollup":[]string{"sub"}}: "3"
--- FAIL: Test3209 (3.29s)

ripienaar avatar Jun 23 '22 06:06 ripienaar

we are interested into having only the last message published for any given subject (it's the most recent stock exchange's segment status). MaxMsgsPerSubject could be good for us, is there documentation about its behavior ? I'm having trouble finding it.

Set the MaxMsgsPerSubject to n and each subject will have just n messages. So instead of rollups, just keep the most recent message. Your consumer can also be set to only get the last message for each subject in the case where you set n > 1

ripienaar avatar Jun 23 '22 06:06 ripienaar

cool, thanks I'm trying this today

kino71 avatar Jun 23 '22 07:06 kino71

I run a few test now and it MaxMsgsPerSubject=1 seems good for us. @ripienaar I think I stumbled upon a very minor nats-cli bug. I wanted to remove the allow rollup flag from "foo" stream, but:

nats str edit foo --no-allow-rollup No difference in configuration

even though "foo" allows rollup

kino71 avatar Jun 23 '22 07:06 kino71

Yeah I saw this bug also yesterday :) Please open a issue will look tomorrow

ripienaar avatar Jun 23 '22 08:06 ripienaar

done, thanks !

kino71 avatar Jun 23 '22 08:06 kino71