nats.go icon indicating copy to clipboard operation
nats.go copied to clipboard

AddConsumer and QueueSubscribe

Open biskit opened this issue 4 years ago • 16 comments

nats.go v1.12.1 nats-server v2.5.0

  1. Using AddConsumer with QueueSubscribeSync fails.
  2. Similarly AddConsumer without AckPolicy fails.
package main

import (
        "fmt"
        "os"
        "time"

        "github.com/nats-io/nats.go"
)

func main() {
        timeout := 5 * time.Second
        // Connect to NATS
        nc, err := nats.Connect("nats://localhost:4222")
        if err != nil {
                fmt.Println("--->>>error", err)
                os.Exit(-1)
        }

        // Create JetStream Context
        js, err := nc.JetStream(nats.PublishAsyncMaxPending(256))
        if err != nil {
                fmt.Println("--->>>1error", err)
                os.Exit(-1)
        }

        err = js.DeleteStream("ORDERS")
        if err != nil {
                fmt.Println("--->>>10aerror", err)
        }

        _, err = js.AddStream(&nats.StreamConfig{
                Name:     "ORDERS",
                Subjects: []string{"ORDERS.*"},
        })
        if err != nil {
                fmt.Println("--->>>11aerror", err)
                os.Exit(-1)
        }

        sinfo, err := js.StreamInfo("ORDERS")
        if err != nil {
                fmt.Println("--->>>11aerror", err)
                os.Exit(-1)
        }
        fmt.Println("--->>stream info", sinfo.Config.Name, sinfo.Config.Subjects)

        if cinfo, _ := js.ConsumerInfo("ORDERS", "MONITOR"); cinfo == nil {
                _, err := js.AddConsumer("ORDERS", &nats.ConsumerConfig {
                        Durable: "MONITOR",
                        DeliverGroup: "group",
                        AckPolicy: nats.AckExplicitPolicy,
                });
                if err != nil {
                        fmt.Println("--->>>11baerror", err)
                        os.Exit(-1)
                }
        }


        // Simple Stream Publisher
        _, err = js.Publish("ORDERS.bar", []byte("hellobar"))
        if err != nil {
                fmt.Println("--->>>1aerror", err)
                os.Exit(-1)
        }

        // Simple Async Stream Publisher
        for i := 0; i < 500; i++ {
                _, err := js.PublishAsync("ORDERS.scratch", []byte(fmt.Sprintf("%s-%d", "hello", i)))
                if err != nil {
                        fmt.Println("--->>>1berror", err)
                        os.Exit(-1)
                }
        }
        select {
        case <-js.PublishAsyncComplete():
                fmt.Println("Publish complete")
        case <-time.After(5 * time.Second):
                fmt.Println("Did not resolve in time")
        }

        // Simple Sync Durable Consumer (optional SubOpts at the end)
        sub, err := js.QueueSubscribeSync("ORDERS.scratch", "group", nats.Durable("MONITOR"), nats.MaxDeliver(3), nats.AckExplicit())
        m, err := sub.NextMsg(timeout)
        if err != nil {
                fmt.Println("--->>>3error", err)
                os.Exit(-1)
        }
        fmt.Println("---->>>m3", string(m.Data))
        m.Ack()
        sub.Unsubscribe()

        // Simple Sync Durable Consumer (optional SubOpts at the end)
        sub, err = js.QueueSubscribeSync("ORDERS.scratch", "group", nats.Durable("MONITOR"), nats.MaxDeliver(3), nats.AckExplicit())
        m, err = sub.NextMsg(timeout)
        if err != nil {
                fmt.Println("--->>>3aerror", err)
                os.Exit(-1)
        }
        fmt.Println("---->>>m4", string(m.Data))
        m.Ack()
        sub.Unsubscribe()

        // Drain
        sub.Drain()
}

output:

--->>stream info ORDERS [ORDERS.*]
Publish complete
--->>>3error nats: invalid subscription

not sure what's wrong.

thanks bal

biskit avatar Sep 14 '21 14:09 biskit

You need to also check the error at sub, err := js.QueueSubscribeSync("ORDERS.scratch", "group", nats.Durable("MONITOR"), nats.MaxDeliver(3), nats.AckExplicit()) and the other one

ripienaar avatar Sep 14 '21 14:09 ripienaar

Looks like you ignore the error when creating the subscription:

sub, err := js.QueueSubscribeSync("ORDERS.scratch", "group", nats.Durable("MONITOR"), nats.MaxDeliver(3), nats.AckExplicit())
m, err := sub.NextMsg(timeout)

Could you check for error after the call to js.QueueSubscribeSync() and report if there is any error?

kozlovic avatar Sep 14 '21 14:09 kozlovic

And if you do you will see that you get an error, this is because when you call js.QueueSubscribeSync() you provide MaxDeliver and explicit mode while you have created the consumer without those values. A JS Consumer can not be changed after it has been created, so that is the reason why. But also, since you created the consumer without the deliver subject, it is considered a pull consumer, so even without the MaxDeliver/etc.. it would not work.

kozlovic avatar Sep 14 '21 14:09 kozlovic

updated it:

package main

import (
        "fmt"
        "os"
        "time"

        "github.com/nats-io/nats.go"
)

func main() {
        timeout := 5 * time.Second
        // Connect to NATS
        nc, err := nats.Connect("nats://localhost:4223")
        if err != nil {
                fmt.Println("--->>>error", err)
                os.Exit(-1)
        }

        // Create JetStream Context
        js, err := nc.JetStream(nats.PublishAsyncMaxPending(256))
        if err != nil {
                fmt.Println("--->>>1error", err)
                os.Exit(-1)
        }

        err = js.DeleteStream("ORDERS")
        if err != nil {
                fmt.Println("--->>>10aerror", err)
        }

        _, err = js.AddStream(&nats.StreamConfig{
                Name:     "ORDERS",
                Subjects: []string{"ORDERS.*"},
        })
        if err != nil {
                fmt.Println("--->>>11aerror", err)
                os.Exit(-1)
        }

        sinfo, err := js.StreamInfo("ORDERS")
        if err != nil {
                fmt.Println("--->>>11aerror", err)
                os.Exit(-1)
        }
        fmt.Println("--->>stream info", sinfo.Config.Name, sinfo.Config.Subjects)

        if cinfo, _ := js.ConsumerInfo("ORDERS", "MONITOR"); cinfo == nil {
                _, err := js.AddConsumer("ORDERS", &nats.ConsumerConfig {
                        Durable: "MONITOR",
                        DeliverGroup: "group",
                        AckPolicy: nats.AckExplicitPolicy,
                });
                if err != nil {
                        fmt.Println("--->>>11baerror", err)
                        os.Exit(-1)
                }
        }


        // Simple Stream Publisher
        _, err = js.Publish("ORDERS.bar", []byte("hellobar"))
        if err != nil {
                fmt.Println("--->>>1aerror", err)
                os.Exit(-1)
        }

        // Simple Async Stream Publisher
        for i := 0; i < 500; i++ {
                _, err := js.PublishAsync("ORDERS.scratch", []byte(fmt.Sprintf("%s-%d", "hello", i)))
                if err != nil {
                        fmt.Println("--->>>1berror", err)
                        os.Exit(-1)
                }
        }
        select {
        case <-js.PublishAsyncComplete():
                fmt.Println("Publish complete")
        case <-time.After(5 * time.Second):
                fmt.Println("Did not resolve in time")
        }

        // Simple Sync Durable Consumer (optional SubOpts at the end)
        sub, err := js.QueueSubscribeSync("ORDERS.scratch", "group", nats.Durable("MONITOR"), nats.MaxDeliver(3), nats.AckExplicit())
        if err != nil {
                fmt.Println("--->>>3subserror", err)
                os.Exit(-1)
        }
        m, err := sub.NextMsg(timeout)
        if err != nil {
                fmt.Println("--->>>3error", err)
                os.Exit(-1)
        }
        fmt.Println("---->>>m3", string(m.Data))
        m.Ack()
        sub.Unsubscribe()

        // Simple Sync Durable Consumer (optional SubOpts at the end)
        sub, err = js.QueueSubscribeSync("ORDERS.scratch", "group", nats.Durable("MONITOR"), nats.MaxDeliver(3), nats.AckExplicit())
        if err != nil {
                fmt.Println("--->>>3asubserror", err)
                os.Exit(-1)
        }
        m, err = sub.NextMsg(timeout)
        if err != nil {
                fmt.Println("--->>>3aerror", err)
                os.Exit(-1)
        }
        fmt.Println("---->>>m4", string(m.Data))
        m.Ack()
        sub.Unsubscribe()

        // Drain
        sub.Drain()
}

this is what i get.

--->>stream info ORDERS [ORDERS.*]
Publish complete
--->>>3subserror nats: must use pull subscribe to bind to pull based consumer

biskit avatar Sep 14 '21 14:09 biskit

Yeah, see my previous comment. I will write here something that should do what you wanted.

kozlovic avatar Sep 14 '21 14:09 kozlovic

js.AddConsumer("ORDERS", &nats.ConsumerConfig{
    Durable: "MONITOR",
    DeliverSubject: nats.NewInbox(),
    DeliverGroup: "group",
    AckPolicy: nats.AckExplicitPolicy,
    MaxDeliver:     3,
})
...
sub, err := js.QueueSubscribeSync("ORDERS.scratch", "group", nats.Durable("MONITOR"))
if err != nil {
...

kozlovic avatar Sep 14 '21 14:09 kozlovic

that seems to work. i would have never guessed nats.NewInbox() as DeliverSubject and was scratching my head on how to handle wildcard subjects, like ORDERS.*.

now, with it compiling and not erroring, let me put the new code here again and the outputs:

package main

import (
        "fmt"
        "os"
        "time"

        "github.com/nats-io/nats.go"
)

func main() {
        timeout := 5 * time.Second
        // Connect to NATS
        nc, err := nats.Connect("nats://localhost:4222")
        if err != nil {
                fmt.Println("--->>>error", err)
                os.Exit(-1)
        }

        // Create JetStream Context
        js, err := nc.JetStream(nats.PublishAsyncMaxPending(256))
        if err != nil {
                fmt.Println("--->>>1error", err)
                os.Exit(-1)
        }

        err = js.DeleteStream("ORDERS")
        if err != nil {
                fmt.Println("--->>>10aerror", err)
        }

        _, err = js.AddStream(&nats.StreamConfig{
                Name:     "ORDERS",
                Subjects: []string{"ORDERS.*"},
        })
        if err != nil {
                fmt.Println("--->>>11aerror", err)
                os.Exit(-1)
        }

        sinfo, err := js.StreamInfo("ORDERS")
        if err != nil {
                fmt.Println("--->>>11aerror", err)
                os.Exit(-1)
        }
        fmt.Println("--->>stream info", sinfo.Config.Name, sinfo.Config.Subjects)

        if cinfo, _ := js.ConsumerInfo("ORDERS", "MONITOR"); cinfo == nil {
                _, err := js.AddConsumer("ORDERS", &nats.ConsumerConfig {
                        Durable: "MONITOR",
                        DeliverSubject: nats.NewInbox(),
                        DeliverGroup: "group",
                        AckPolicy: nats.AckExplicitPolicy,
                        MaxDeliver: 3,
                });
                if err != nil {
                        fmt.Println("--->>>11baerror", err)
                        os.Exit(-1)
                }
        }


        // Simple Stream Publisher
        _, err = js.Publish("ORDERS.bar", []byte("hellobar"))
        if err != nil {
                fmt.Println("--->>>1aerror", err)
                os.Exit(-1)
        }

        // Simple Async Stream Publisher
        for i := 0; i < 500; i++ {
                _, err := js.PublishAsync("ORDERS.scratch", []byte(fmt.Sprintf("%s-%d", "hello", i)))
                if err != nil {
                        fmt.Println("--->>>1berror", err)
                        os.Exit(-1)
                }
        }
        select {
        case <-js.PublishAsyncComplete():
                fmt.Println("Publish complete")
        case <-time.After(5 * time.Second):
                fmt.Println("Did not resolve in time")
        }

        // Simple Sync Durable Consumer (optional SubOpts at the end)
        sub, err := js.QueueSubscribeSync("ORDERS.scratch", "group", nats.Durable("MONITOR"), nats.MaxDeliver(3), nats.AckExplicit())
        if err != nil {
                fmt.Println("--->>>3subserror", err)
                os.Exit(-1)
        }
        m, err := sub.NextMsg(timeout)
        if err != nil {
                fmt.Println("--->>>3error", err)
                os.Exit(-1)
        }
        fmt.Println("---->>>m3", string(m.Data))
        m.Ack()
        sub.Unsubscribe()

        // Simple Sync Durable Consumer (optional SubOpts at the end)
        sub, err = js.QueueSubscribeSync("ORDERS.scratch", "group", nats.Durable("MONITOR"), nats.MaxDeliver(3), nats.AckExplicit())
        if err != nil {
                fmt.Println("--->>>3asubserror", err)
                os.Exit(-1)
        }
        m, err = sub.NextMsg(timeout)
        if err != nil {
                fmt.Println("--->>>3aerror", err)
                os.Exit(-1)
        }
        fmt.Println("---->>>m4", string(m.Data))
        m.Ack()
        sub.Unsubscribe()

        // Drain
        sub.Drain()
}

output:

--->>stream info ORDERS [ORDERS.*]
Publish complete
---->>>m3 hellobar
---->>>m4 hello-262
  1. as you see m3 got hellobar from ORDERS.bar subject that it did not request from
  2. m4 got the right subject data, but I would have expected the output to be hello-1 (assuming m3 got hello-0)

biskit avatar Sep 14 '21 14:09 biskit

Sorry, we missed FilterSubject: "ORDERS.bar", when creating the consumer, can you change that and see if that works?

kozlovic avatar Sep 14 '21 15:09 kozlovic

I meant ORDERS.scratch.. whatever value you would want to consume.

kozlovic avatar Sep 14 '21 15:09 kozlovic

Since your stream captures many different subject, when creating the consumer, you want to filter the one your want. If you don't specify, you get them all. The subject is js.QueueSubscribeSync() is in that case only use to figure out the name of the stream, but the JS consumer is the one that gets the message and send to internal NATS subscription created by js.QueueSubscribeSync()... yes, this is confusing, but that's the way it is unfortunately.

kozlovic avatar Sep 14 '21 15:09 kozlovic

thanks @kozlovic for the response. yes, I did add it and it filters.

output:

--->>stream info ORDERS [ORDERS.*]
Publish complete
---->>>m3 hello-0
---->>>m4 hello-287

the first one gets first message (and is ack'd). then unsubscribed and the next one gets something out of sequence.

biskit avatar Sep 14 '21 15:09 biskit

yes, this is confusing, but that's the way it is unfortunately

in the stan world, i just did not unsubscribe and closed connections to restore them later either with a sequence number or start from beginning. guess that makes code simpler here too? like your feedback on it.

biskit avatar Sep 14 '21 15:09 biskit

the first one gets first message (and is ack'd). then unsubscribed and the next one gets something out of sequence.

Ok, I used MaxDeliver because that is what you specified in the example, but I understand now that this was probably not what you expect it to be: MaxDeliver is the number of times a server will (re)deliver a message, not the max number of messages to be sent to a sub (like MaxInflight in streaming).

Here, when you start the first queue member, it may have received many messages from the consumer, since at that time this member is the only one to exist. You call NextMsg() once, ack and then unsubscribe, so all other messages that were sent to this member will be "un-acked" and will be redelivered later.

I think you are just experimenting, but you should let us know what exactly you are trying to do so that we can better explain and you have a better experience.

in the stan world, i just did not unsubscribe and closed connections to restore them later either with a sequence number or start from beginning. guess that makes code simpler here too? like your feedback on it.

No, in NATS Streaming, once a queue group was created (same for a durable), the "start" sequence when restarting a member (or the durable) was ignored, because the server knew where it was in the channel sequence for this group (or durable). The "start" meant something only when the group (or durable) was created for the first time.

kozlovic avatar Sep 14 '21 15:09 kozlovic

I think you are just experimenting, but you should let us know what exactly you are trying to do so that we can better explain and you have a better experience.

Yes, I'm. And the explanation on MaxDeliver makes sense.

The "start" meant something only when the group (or durable) was created for the first time.

Understood and I mixed my answer for both durable QueueSubscribe and durable Subscribe. With Subscribe I use a start sequence number, whereas with QueueSubscribe the durable name takes care of what comes next. This is with stan.

Coming back to the experiment, I want to see will I be able to continue where I left off in case of a durable queue. (push or pull). if Msg-0, Msg-1, Msg-2 went in, do I get Msg-0, Msg-1, Msg-2 out in QueueSubscribeSync/Ack or PullSubscribe/Fetch(1)/Ack sequence is what I'm trying...

  • It seems to work fine with PullSubscribe where if I don't Unsubscribe a new PullSubscribe call to the same subject and durable name continues with the next message.
  • In the case of QueueSubscribeSync as you say above MaxDeliver may play a part in it, but with an AckExplicit() and Unsubscribe(), the next invocation should be able to get the next sequential message. With or without Unsubscribe() I don't get the next message (for what the AddConsumer came to play and I can't get that to work to get the desired effect)

Hope the above explanation makes sense...

biskit avatar Sep 14 '21 16:09 biskit

but with an AckExplicit() and Unsubscribe(), the next invocation should be able to get the next sequential message. With or without Unsubscribe() I don't get the next message

That is because, I think, the server has marked the messages as delivered, and they will be redelivered based on the AckWait, which is default of 30 seconds.

kozlovic avatar Sep 14 '21 17:09 kozlovic

Then an application using QueueSubscribe that crashes and restarts immediately should be ready to handle out of sequence messages, no matter what the Ack model is. The idea behind using this call is to load balance heavy volume data ingestion so turning off MaxWaiting (guess thats' the ConsumerConfig equivalent of stan.SetPendingLimits) is not advisable either.

biskit avatar Sep 14 '21 17:09 biskit