nats.go
nats.go copied to clipboard
PullSubscription must match Consumer Filter Subject
Defect
When calling PullSubscribe, with a subject that is a subset of the Consumer filter subject, the call fails with this error message: nats: subject does not match consumer
The code on line 1057 of js.go says that the pullsubscribe subject can be a subset of the consumer filter subject. On line 1058, the test is that the pullsunscribe matches the consumer filter subject. See image of code below.
Make sure that these boxes are checked before submitting your issue -- thank you!
- [x] Included nats.go version
- [x] Included a [Minimal, Complete, and Verifiable example] (https://stackoverflow.com/help/mcve)
Versions of nats.go and the nats-server if one was involved:
nats.go 1.11.0 which is running against NSG leaf server
OS/Container environment:
Linux?
Steps or code to reproduce the issue:
Consumer:
? Select a Consumer bsl-organizations-wildcard
Information for Consumer business-service-layer > bsl-organizations-wildcard created 2021-05-04T15:03:58-07:00
Configuration:
Durable Name: bsl-organizations-wildcard
Pull Mode: true
Filter Subject: bsl.organization.>
Deliver Next: true
Ack Policy: Explicit
Ack Wait: 30s
Replay Policy: Instant
Maximum Deliveries: 1
Max Ack Pending: 20,000
State:
Last Delivered Message: Consumer sequence: 292 Stream sequence: 2750
Acknowledgment floor: Consumer sequence: 292 Stream sequence: 2750
Outstanding Acks: 0 out of maximum 20000
Redelivered Messages: 4
Unprocessed Messages: 0
Pull Sub: Subject: bsl.organization.supplier.> Durable Name: bsl-organization-wildcard
js.go code:

Expected result:
That the consumer with a filter subject of bsl.organization.> would accept a pull subscriber of bsl.organization.supllier.> because it is a sub set of the consumer filter subject.
Actual result:
Error is returned: nats: subject does not match consumer
Thanks for the report @syacko-sote, this looks related to this: https://github.com/nats-io/nats.go/issues/731 Looking at how to improve the support for filter subjects, in the meantime for PullSubscribe calling nats.BindStream("foo") may help being able to make the stream bind to the consumer: https://github.com/nats-io/nats.go/issues/731#issue-895907741
Thanks for the report @syacko-sote, this looks related to this: #731 Looking at how to improve the support for filter subjects, in the meantime for
PullSubscribecallingnats.BindStream("foo")may help being able to make the stream bind to the consumer: #731 (comment)
I really need that functionality too and nats.BindStream() does not help and I would like to know did you implement this?
Why does nats.BindStream() not help?
I have a consumer with these configurations
Information for Consumer stream_0 > consumer_0 created 2022-06-15T20:47:17+04:30
Configuration:
Durable Name: consumer_0
Pull Mode: true
Filter Subject: sub.>
Deliver Policy: All
Ack Policy: Explicit
Ack Wait: 30s
Replay Policy: Instant
Max Ack Pending: 1,000
Max Waiting Pulls: 512
State:
Last Delivered Message: Consumer sequence: 176 Stream sequence: 519 Last delivery: 4.18s ago
Acknowledgment floor: Consumer sequence: 160 Stream sequence: 510 Last Ack: 12m44s ago
Outstanding Acks: 8 out of maximum 1,000
Redelivered Messages: 8
Unprocessed Messages: 0
Waiting Pulls: 0 of maximum 512
and when I'm trying to pull messages from a subset of the filtered subject, for example, "sub.0", nats return this ErrSubjectMismatch
func main() {
nc, _ := nats.Connect("localhost:4222")
js, _ := nc.JetStream()
sub, err := js.PullSubscribe("sub.0", "consumer_0", nats.BindStream("stream_0"))
if err != nil {
panic(err)
}
msgs, err := sub.Fetch(10)
if err != nil {
panic(err)
}
if len(msgs) > 0 {
for _, msg := range msgs {
msg.Ack()
println(string(msg.Data))
}
}
}
Expected result: only messages from "sub.0"
Actual result:
error panic: nats: subject does not match consumer
and here despite the comment, being a subset of the FilterSubject doesn't check, and only checks if the FilterSubject is not empty and that the selected subject is precisely equal to the FilterSubject
Once a consumer is made it gets all that’s in the consumer filter subject always.
You can though add another consumer with a different filter subject.
You can’t pull from an existing consumer using a different subject than it’s filter.
Once a consumer is made it gets all that’s in the consumer filter subject always.
You can though add another consumer with a different filter subject.
You can’t pull from an existing consumer using a different subject than it’s filter.
I want to pull from a subset of the consumer's FilterSubject, not a different subject
I understand that. But a subset is not the same as the whole so a new consumer is needed. We don’t support what you want
Would creating a new, ephemeral R1 consumer with the filter subject you want be an option?
I'm not sure from the conversation as to whether NATS server supports this. If not then the comment on the function should change. If it does, then the following function would fix the behavior in the go client:
func isSubjectSubset(subject, test string) bool {
s := strings.Split(subject, ".")
t := strings.Split(test, ".")
if strings.HasSuffix(subject, ">") {
if len(t) < len(s) {
// Subject is too short to be a subset
return false
}
// Truncate to >
t = append(t[:len(s)-1], ">")
}
// Check they are the same length
if len(t) != len(s) {
return false
}
// Compare except wildcards
for i := 0; i < len(s); i++ {
if s[i] == "*" {
continue
}
if s[i] != t[i] {
return false
}
}
return true
}
Should I make a PR?