nats.go icon indicating copy to clipboard operation
nats.go copied to clipboard

PullSubscription must match Consumer Filter Subject

Open syacko-sote opened this issue 4 years ago • 9 comments

Defect

When calling PullSubscribe, with a subject that is a subset of the Consumer filter subject, the call fails with this error message: nats: subject does not match consumer

The code on line 1057 of js.go says that the pullsubscribe subject can be a subset of the consumer filter subject. On line 1058, the test is that the pullsunscribe matches the consumer filter subject. See image of code below.

Make sure that these boxes are checked before submitting your issue -- thank you!

  • [x] Included nats.go version
  • [x] Included a [Minimal, Complete, and Verifiable example] (https://stackoverflow.com/help/mcve)

Versions of nats.go and the nats-server if one was involved:

nats.go 1.11.0 which is running against NSG leaf server

OS/Container environment:

Linux?

Steps or code to reproduce the issue:

Consumer:

? Select a Consumer bsl-organizations-wildcard
Information for Consumer business-service-layer > bsl-organizations-wildcard created 2021-05-04T15:03:58-07:00

Configuration:

        Durable Name: bsl-organizations-wildcard
           Pull Mode: true
      Filter Subject: bsl.organization.>
        Deliver Next: true
          Ack Policy: Explicit
            Ack Wait: 30s
       Replay Policy: Instant
  Maximum Deliveries: 1
     Max Ack Pending: 20,000

State:

   Last Delivered Message: Consumer sequence: 292 Stream sequence: 2750
     Acknowledgment floor: Consumer sequence: 292 Stream sequence: 2750
         Outstanding Acks: 0 out of maximum 20000
     Redelivered Messages: 4
     Unprocessed Messages: 0

Pull Sub: Subject: bsl.organization.supplier.> Durable Name: bsl-organization-wildcard

js.go code: Screen Shot 2021-05-26 at 3 49 19 PM

Expected result:

That the consumer with a filter subject of bsl.organization.> would accept a pull subscriber of bsl.organization.supllier.> because it is a sub set of the consumer filter subject.

Actual result:

Error is returned: nats: subject does not match consumer

syacko-sote avatar May 26 '21 23:05 syacko-sote

Thanks for the report @syacko-sote, this looks related to this: https://github.com/nats-io/nats.go/issues/731 Looking at how to improve the support for filter subjects, in the meantime for PullSubscribe calling nats.BindStream("foo") may help being able to make the stream bind to the consumer: https://github.com/nats-io/nats.go/issues/731#issue-895907741

wallyqs avatar May 28 '21 19:05 wallyqs

Thanks for the report @syacko-sote, this looks related to this: #731 Looking at how to improve the support for filter subjects, in the meantime for PullSubscribe calling nats.BindStream("foo") may help being able to make the stream bind to the consumer: #731 (comment)

I really need that functionality too and nats.BindStream() does not help and I would like to know did you implement this?

alikarimi999 avatar Jun 12 '22 20:06 alikarimi999

Why does nats.BindStream() not help?

derekcollison avatar Jun 13 '22 14:06 derekcollison

I have a consumer with these configurations

Information for Consumer stream_0 > consumer_0 created 2022-06-15T20:47:17+04:30

Configuration:

        Durable Name: consumer_0
           Pull Mode: true
      Filter Subject: sub.>
      Deliver Policy: All
          Ack Policy: Explicit
            Ack Wait: 30s
       Replay Policy: Instant
     Max Ack Pending: 1,000
   Max Waiting Pulls: 512

State:

   Last Delivered Message: Consumer sequence: 176 Stream sequence: 519 Last delivery: 4.18s ago
     Acknowledgment floor: Consumer sequence: 160 Stream sequence: 510 Last Ack: 12m44s ago
         Outstanding Acks: 8 out of maximum 1,000
     Redelivered Messages: 8
     Unprocessed Messages: 0
            Waiting Pulls: 0 of maximum 512

and when I'm trying to pull messages from a subset of the filtered subject, for example, "sub.0", nats return this ErrSubjectMismatch


func main() {
	nc, _ := nats.Connect("localhost:4222")

	js, _ := nc.JetStream()

	sub, err := js.PullSubscribe("sub.0", "consumer_0", nats.BindStream("stream_0"))
	if err != nil {
		panic(err)
	}

	msgs, err := sub.Fetch(10)
	if err != nil {
		panic(err)
	}

	if len(msgs) > 0 {
		for _, msg := range msgs {
			msg.Ack()
			println(string(msg.Data))
		}
	}
}

Expected result: only messages from "sub.0"

Actual result: error panic: nats: subject does not match consumer

and here despite the comment, being a subset of the FilterSubject doesn't check, and only checks if the FilterSubject is not empty and that the selected subject is precisely equal to the FilterSubject

alikarimi999 avatar Jun 15 '22 17:06 alikarimi999

Once a consumer is made it gets all that’s in the consumer filter subject always.

You can though add another consumer with a different filter subject.

You can’t pull from an existing consumer using a different subject than it’s filter.

ripienaar avatar Jun 15 '22 17:06 ripienaar

Once a consumer is made it gets all that’s in the consumer filter subject always.

You can though add another consumer with a different filter subject.

You can’t pull from an existing consumer using a different subject than it’s filter.

I want to pull from a subset of the consumer's FilterSubject, not a different subject

alikarimi999 avatar Jun 15 '22 18:06 alikarimi999

I understand that. But a subset is not the same as the whole so a new consumer is needed. We don’t support what you want

ripienaar avatar Jun 15 '22 18:06 ripienaar

Would creating a new, ephemeral R1 consumer with the filter subject you want be an option?

derekcollison avatar Jun 15 '22 19:06 derekcollison

I'm not sure from the conversation as to whether NATS server supports this. If not then the comment on the function should change. If it does, then the following function would fix the behavior in the go client:

func isSubjectSubset(subject, test string) bool {
	s := strings.Split(subject, ".")
	t := strings.Split(test, ".")
	if strings.HasSuffix(subject, ">") {
		if len(t) < len(s) {
			// Subject is too short to be a subset
			return false
		}
		// Truncate to >
		t = append(t[:len(s)-1], ">")
	}
	// Check they are the same length
	if len(t) != len(s) {
		return false
	}
	// Compare except wildcards
	for i := 0; i < len(s); i++ {
		if s[i] == "*" {
			continue
		}
		if s[i] != t[i] {
			return false
		}
	}
	return true
}

Should I make a PR?

andreib1 avatar Sep 09 '22 08:09 andreib1