AddConsumer and QueueSubscribe
nats.go v1.12.1 nats-server v2.5.0
- Using AddConsumer with QueueSubscribeSync fails.
- Similarly AddConsumer without AckPolicy fails.
package main
import (
"fmt"
"os"
"time"
"github.com/nats-io/nats.go"
)
func main() {
timeout := 5 * time.Second
// Connect to NATS
nc, err := nats.Connect("nats://localhost:4222")
if err != nil {
fmt.Println("--->>>error", err)
os.Exit(-1)
}
// Create JetStream Context
js, err := nc.JetStream(nats.PublishAsyncMaxPending(256))
if err != nil {
fmt.Println("--->>>1error", err)
os.Exit(-1)
}
err = js.DeleteStream("ORDERS")
if err != nil {
fmt.Println("--->>>10aerror", err)
}
_, err = js.AddStream(&nats.StreamConfig{
Name: "ORDERS",
Subjects: []string{"ORDERS.*"},
})
if err != nil {
fmt.Println("--->>>11aerror", err)
os.Exit(-1)
}
sinfo, err := js.StreamInfo("ORDERS")
if err != nil {
fmt.Println("--->>>11aerror", err)
os.Exit(-1)
}
fmt.Println("--->>stream info", sinfo.Config.Name, sinfo.Config.Subjects)
if cinfo, _ := js.ConsumerInfo("ORDERS", "MONITOR"); cinfo == nil {
_, err := js.AddConsumer("ORDERS", &nats.ConsumerConfig {
Durable: "MONITOR",
DeliverGroup: "group",
AckPolicy: nats.AckExplicitPolicy,
});
if err != nil {
fmt.Println("--->>>11baerror", err)
os.Exit(-1)
}
}
// Simple Stream Publisher
_, err = js.Publish("ORDERS.bar", []byte("hellobar"))
if err != nil {
fmt.Println("--->>>1aerror", err)
os.Exit(-1)
}
// Simple Async Stream Publisher
for i := 0; i < 500; i++ {
_, err := js.PublishAsync("ORDERS.scratch", []byte(fmt.Sprintf("%s-%d", "hello", i)))
if err != nil {
fmt.Println("--->>>1berror", err)
os.Exit(-1)
}
}
select {
case <-js.PublishAsyncComplete():
fmt.Println("Publish complete")
case <-time.After(5 * time.Second):
fmt.Println("Did not resolve in time")
}
// Simple Sync Durable Consumer (optional SubOpts at the end)
sub, err := js.QueueSubscribeSync("ORDERS.scratch", "group", nats.Durable("MONITOR"), nats.MaxDeliver(3), nats.AckExplicit())
m, err := sub.NextMsg(timeout)
if err != nil {
fmt.Println("--->>>3error", err)
os.Exit(-1)
}
fmt.Println("---->>>m3", string(m.Data))
m.Ack()
sub.Unsubscribe()
// Simple Sync Durable Consumer (optional SubOpts at the end)
sub, err = js.QueueSubscribeSync("ORDERS.scratch", "group", nats.Durable("MONITOR"), nats.MaxDeliver(3), nats.AckExplicit())
m, err = sub.NextMsg(timeout)
if err != nil {
fmt.Println("--->>>3aerror", err)
os.Exit(-1)
}
fmt.Println("---->>>m4", string(m.Data))
m.Ack()
sub.Unsubscribe()
// Drain
sub.Drain()
}
output:
--->>stream info ORDERS [ORDERS.*]
Publish complete
--->>>3error nats: invalid subscription
not sure what's wrong.
thanks bal
You need to also check the error at sub, err := js.QueueSubscribeSync("ORDERS.scratch", "group", nats.Durable("MONITOR"), nats.MaxDeliver(3), nats.AckExplicit()) and the other one
Looks like you ignore the error when creating the subscription:
sub, err := js.QueueSubscribeSync("ORDERS.scratch", "group", nats.Durable("MONITOR"), nats.MaxDeliver(3), nats.AckExplicit())
m, err := sub.NextMsg(timeout)
Could you check for error after the call to js.QueueSubscribeSync() and report if there is any error?
And if you do you will see that you get an error, this is because when you call js.QueueSubscribeSync() you provide MaxDeliver and explicit mode while you have created the consumer without those values. A JS Consumer can not be changed after it has been created, so that is the reason why. But also, since you created the consumer without the deliver subject, it is considered a pull consumer, so even without the MaxDeliver/etc.. it would not work.
updated it:
package main
import (
"fmt"
"os"
"time"
"github.com/nats-io/nats.go"
)
func main() {
timeout := 5 * time.Second
// Connect to NATS
nc, err := nats.Connect("nats://localhost:4223")
if err != nil {
fmt.Println("--->>>error", err)
os.Exit(-1)
}
// Create JetStream Context
js, err := nc.JetStream(nats.PublishAsyncMaxPending(256))
if err != nil {
fmt.Println("--->>>1error", err)
os.Exit(-1)
}
err = js.DeleteStream("ORDERS")
if err != nil {
fmt.Println("--->>>10aerror", err)
}
_, err = js.AddStream(&nats.StreamConfig{
Name: "ORDERS",
Subjects: []string{"ORDERS.*"},
})
if err != nil {
fmt.Println("--->>>11aerror", err)
os.Exit(-1)
}
sinfo, err := js.StreamInfo("ORDERS")
if err != nil {
fmt.Println("--->>>11aerror", err)
os.Exit(-1)
}
fmt.Println("--->>stream info", sinfo.Config.Name, sinfo.Config.Subjects)
if cinfo, _ := js.ConsumerInfo("ORDERS", "MONITOR"); cinfo == nil {
_, err := js.AddConsumer("ORDERS", &nats.ConsumerConfig {
Durable: "MONITOR",
DeliverGroup: "group",
AckPolicy: nats.AckExplicitPolicy,
});
if err != nil {
fmt.Println("--->>>11baerror", err)
os.Exit(-1)
}
}
// Simple Stream Publisher
_, err = js.Publish("ORDERS.bar", []byte("hellobar"))
if err != nil {
fmt.Println("--->>>1aerror", err)
os.Exit(-1)
}
// Simple Async Stream Publisher
for i := 0; i < 500; i++ {
_, err := js.PublishAsync("ORDERS.scratch", []byte(fmt.Sprintf("%s-%d", "hello", i)))
if err != nil {
fmt.Println("--->>>1berror", err)
os.Exit(-1)
}
}
select {
case <-js.PublishAsyncComplete():
fmt.Println("Publish complete")
case <-time.After(5 * time.Second):
fmt.Println("Did not resolve in time")
}
// Simple Sync Durable Consumer (optional SubOpts at the end)
sub, err := js.QueueSubscribeSync("ORDERS.scratch", "group", nats.Durable("MONITOR"), nats.MaxDeliver(3), nats.AckExplicit())
if err != nil {
fmt.Println("--->>>3subserror", err)
os.Exit(-1)
}
m, err := sub.NextMsg(timeout)
if err != nil {
fmt.Println("--->>>3error", err)
os.Exit(-1)
}
fmt.Println("---->>>m3", string(m.Data))
m.Ack()
sub.Unsubscribe()
// Simple Sync Durable Consumer (optional SubOpts at the end)
sub, err = js.QueueSubscribeSync("ORDERS.scratch", "group", nats.Durable("MONITOR"), nats.MaxDeliver(3), nats.AckExplicit())
if err != nil {
fmt.Println("--->>>3asubserror", err)
os.Exit(-1)
}
m, err = sub.NextMsg(timeout)
if err != nil {
fmt.Println("--->>>3aerror", err)
os.Exit(-1)
}
fmt.Println("---->>>m4", string(m.Data))
m.Ack()
sub.Unsubscribe()
// Drain
sub.Drain()
}
this is what i get.
--->>stream info ORDERS [ORDERS.*]
Publish complete
--->>>3subserror nats: must use pull subscribe to bind to pull based consumer
Yeah, see my previous comment. I will write here something that should do what you wanted.
js.AddConsumer("ORDERS", &nats.ConsumerConfig{
Durable: "MONITOR",
DeliverSubject: nats.NewInbox(),
DeliverGroup: "group",
AckPolicy: nats.AckExplicitPolicy,
MaxDeliver: 3,
})
...
sub, err := js.QueueSubscribeSync("ORDERS.scratch", "group", nats.Durable("MONITOR"))
if err != nil {
...
that seems to work. i would have never guessed nats.NewInbox() as DeliverSubject and was scratching my head on how to handle wildcard subjects, like ORDERS.*.
now, with it compiling and not erroring, let me put the new code here again and the outputs:
package main
import (
"fmt"
"os"
"time"
"github.com/nats-io/nats.go"
)
func main() {
timeout := 5 * time.Second
// Connect to NATS
nc, err := nats.Connect("nats://localhost:4222")
if err != nil {
fmt.Println("--->>>error", err)
os.Exit(-1)
}
// Create JetStream Context
js, err := nc.JetStream(nats.PublishAsyncMaxPending(256))
if err != nil {
fmt.Println("--->>>1error", err)
os.Exit(-1)
}
err = js.DeleteStream("ORDERS")
if err != nil {
fmt.Println("--->>>10aerror", err)
}
_, err = js.AddStream(&nats.StreamConfig{
Name: "ORDERS",
Subjects: []string{"ORDERS.*"},
})
if err != nil {
fmt.Println("--->>>11aerror", err)
os.Exit(-1)
}
sinfo, err := js.StreamInfo("ORDERS")
if err != nil {
fmt.Println("--->>>11aerror", err)
os.Exit(-1)
}
fmt.Println("--->>stream info", sinfo.Config.Name, sinfo.Config.Subjects)
if cinfo, _ := js.ConsumerInfo("ORDERS", "MONITOR"); cinfo == nil {
_, err := js.AddConsumer("ORDERS", &nats.ConsumerConfig {
Durable: "MONITOR",
DeliverSubject: nats.NewInbox(),
DeliverGroup: "group",
AckPolicy: nats.AckExplicitPolicy,
MaxDeliver: 3,
});
if err != nil {
fmt.Println("--->>>11baerror", err)
os.Exit(-1)
}
}
// Simple Stream Publisher
_, err = js.Publish("ORDERS.bar", []byte("hellobar"))
if err != nil {
fmt.Println("--->>>1aerror", err)
os.Exit(-1)
}
// Simple Async Stream Publisher
for i := 0; i < 500; i++ {
_, err := js.PublishAsync("ORDERS.scratch", []byte(fmt.Sprintf("%s-%d", "hello", i)))
if err != nil {
fmt.Println("--->>>1berror", err)
os.Exit(-1)
}
}
select {
case <-js.PublishAsyncComplete():
fmt.Println("Publish complete")
case <-time.After(5 * time.Second):
fmt.Println("Did not resolve in time")
}
// Simple Sync Durable Consumer (optional SubOpts at the end)
sub, err := js.QueueSubscribeSync("ORDERS.scratch", "group", nats.Durable("MONITOR"), nats.MaxDeliver(3), nats.AckExplicit())
if err != nil {
fmt.Println("--->>>3subserror", err)
os.Exit(-1)
}
m, err := sub.NextMsg(timeout)
if err != nil {
fmt.Println("--->>>3error", err)
os.Exit(-1)
}
fmt.Println("---->>>m3", string(m.Data))
m.Ack()
sub.Unsubscribe()
// Simple Sync Durable Consumer (optional SubOpts at the end)
sub, err = js.QueueSubscribeSync("ORDERS.scratch", "group", nats.Durable("MONITOR"), nats.MaxDeliver(3), nats.AckExplicit())
if err != nil {
fmt.Println("--->>>3asubserror", err)
os.Exit(-1)
}
m, err = sub.NextMsg(timeout)
if err != nil {
fmt.Println("--->>>3aerror", err)
os.Exit(-1)
}
fmt.Println("---->>>m4", string(m.Data))
m.Ack()
sub.Unsubscribe()
// Drain
sub.Drain()
}
output:
--->>stream info ORDERS [ORDERS.*]
Publish complete
---->>>m3 hellobar
---->>>m4 hello-262
- as you see
m3gothellobarfromORDERS.barsubject that it did not request from -
m4got the right subject data, but I would have expected the output to behello-1(assumingm3gothello-0)
Sorry, we missed FilterSubject: "ORDERS.bar", when creating the consumer, can you change that and see if that works?
I meant ORDERS.scratch.. whatever value you would want to consume.
Since your stream captures many different subject, when creating the consumer, you want to filter the one your want. If you don't specify, you get them all. The subject is js.QueueSubscribeSync() is in that case only use to figure out the name of the stream, but the JS consumer is the one that gets the message and send to internal NATS subscription created by js.QueueSubscribeSync()... yes, this is confusing, but that's the way it is unfortunately.
thanks @kozlovic for the response. yes, I did add it and it filters.
output:
--->>stream info ORDERS [ORDERS.*]
Publish complete
---->>>m3 hello-0
---->>>m4 hello-287
the first one gets first message (and is ack'd). then unsubscribed and the next one gets something out of sequence.
yes, this is confusing, but that's the way it is unfortunately
in the stan world, i just did not unsubscribe and closed connections to restore them later either with a sequence number or start from beginning. guess that makes code simpler here too? like your feedback on it.
the first one gets first message (and is ack'd). then unsubscribed and the next one gets something out of sequence.
Ok, I used MaxDeliver because that is what you specified in the example, but I understand now that this was probably not what you expect it to be: MaxDeliver is the number of times a server will (re)deliver a message, not the max number of messages to be sent to a sub (like MaxInflight in streaming).
Here, when you start the first queue member, it may have received many messages from the consumer, since at that time this member is the only one to exist. You call NextMsg() once, ack and then unsubscribe, so all other messages that were sent to this member will be "un-acked" and will be redelivered later.
I think you are just experimenting, but you should let us know what exactly you are trying to do so that we can better explain and you have a better experience.
in the stan world, i just did not unsubscribe and closed connections to restore them later either with a sequence number or start from beginning. guess that makes code simpler here too? like your feedback on it.
No, in NATS Streaming, once a queue group was created (same for a durable), the "start" sequence when restarting a member (or the durable) was ignored, because the server knew where it was in the channel sequence for this group (or durable). The "start" meant something only when the group (or durable) was created for the first time.
I think you are just experimenting, but you should let us know what exactly you are trying to do so that we can better explain and you have a better experience.
Yes, I'm. And the explanation on MaxDeliver makes sense.
The "start" meant something only when the group (or durable) was created for the first time.
Understood and I mixed my answer for both durable QueueSubscribe and durable Subscribe. With Subscribe I use a start sequence number, whereas with QueueSubscribe the durable name takes care of what comes next. This is with stan.
Coming back to the experiment, I want to see will I be able to continue where I left off in case of a durable queue. (push or pull). if Msg-0, Msg-1, Msg-2 went in, do I get Msg-0, Msg-1, Msg-2 out in QueueSubscribeSync/Ack or PullSubscribe/Fetch(1)/Ack sequence is what I'm trying...
- It seems to work fine with PullSubscribe where if I don't Unsubscribe a new PullSubscribe call to the same subject and durable name continues with the next message.
- In the case of QueueSubscribeSync as you say above MaxDeliver may play a part in it, but with an AckExplicit() and Unsubscribe(), the next invocation should be able to get the next sequential message. With or without Unsubscribe() I don't get the next message (for what the AddConsumer came to play and I can't get that to work to get the desired effect)
Hope the above explanation makes sense...
but with an AckExplicit() and Unsubscribe(), the next invocation should be able to get the next sequential message. With or without Unsubscribe() I don't get the next message
That is because, I think, the server has marked the messages as delivered, and they will be redelivered based on the AckWait, which is default of 30 seconds.
Then an application using QueueSubscribe that crashes and restarts immediately should be ready to handle out of sequence messages, no matter what the Ack model is.
The idea behind using this call is to load balance heavy volume data ingestion so turning off MaxWaiting (guess thats' the ConsumerConfig equivalent of stan.SetPendingLimits) is not advisable either.