Add option to trigger handlers in a separate go routines
Subscribe and QueueSubscribe are running in the serial way. This PR adds option to run them in the parallel mode.
This relates to https://github.com/nats-io/go-nats/issues/263
This addition is only for enc connection.
Coverage increased (+0.02%) to 92.529% when pulling 0f3ff78298183546d186b29d5b854fa6643a889e on pamelin:feature/parallel-enc-subscribe into c528ff487513eec69347b5598eb35d91f0a63820 on nats-io:master.
To me it make more sense for users to do their own async if needed and control their own buffer sizes, easily done by making your cb routing call a go routine on a case by case basis.
It's a easy thing to do for users and quite typical vs carrying more complexity in this package
@ripienaar I have to disagree with you. It is not straight forward to do it with enc connection (of course it is easy go is easy) It might be straight forward to do it with normal connection because MsgHandler is a function and it could be wrapped with go routine when subscribing e.g.
h := func(msg *nats.Msg) {
//handle message
}
nc.Subscribe("subj", func(msg *nats.Msg) {
go h(msg)
})
In case of enc connection Handler is an interface and this is where you would have to either push goroutine logic into the handler or create your own version of enc connection
h := func(subj, reply, s string) {
go func() {
// do my work here
}()
}
Now imagine you have 100 handlers which you pass to the subscribe function. Every single of them would have to contain this logic which shouldn't even be there for various reasons.
On top of this I still don't understand why desired behaviour is to process incoming messages in the serial fashion? For me both are equally valid use cases.
Hi, sorry I'm not following that part is not straightforward to do with the encoded connection. You could wrap the callback so that the request is processed in its own goroutine as like something as follows no?
parallelSubscribe := func(ec *nats.EncodedConn, subject string, cb func(b []byte)){
ec.Subscribe(subject, func(b []byte) {
go cb(b)
})
}
testBytes := []byte("Hello World!")
parallelSubscribe(ec, "enc_bytes", func(b []byte) {
fmt.Println(time.Now(), "[Received] ", string(b))
time.Sleep(200 * time.Millisecond) // does not block since in its own goroutine
})
@wallyqs This only covers specific case where Handler is of true type func(b []byte)
Handler is an interface. It is not straightforward to do this
parallelSubscribe := func(ec *nats.EncodedConn, subject string, cb nats.Handler){
...
without duplicating the reflections logic from the subscribe function.
Another option would be to extract logic which creates MsgHandler into the separate public function e.g.
func (c *EncodedConn) subscribe(subject, queue string, cb Handler) (*Subscription, error) {
natsCB, err := ToMsgHandler(c, cb)
if err != nil{
return nil, err
}
return c.Conn.subscribe(subject, queue, natsCB, nil, false)
}
func ToMsgHandler(c *EncodedConn, cb Handler) (MsgHandler, error) {
if cb == nil {
return nil, errors.New("nats: Handler required for EncodedConn Subscription")
}
argType, numArgs := argInfo(cb)
if argType == nil {
return nil, errors.New("nats: Handler requires at least one argument")
}
cbValue := reflect.ValueOf(cb)
wantsRaw := (argType == emptyMsgType)
return func(m *Msg) {
var oV []reflect.Value
if wantsRaw {
oV = []reflect.Value{reflect.ValueOf(m)}
} else {
var oPtr reflect.Value
if argType.Kind() != reflect.Ptr {
oPtr = reflect.New(argType)
} else {
oPtr = reflect.New(argType.Elem())
}
if err := c.Enc.Decode(m.Subject, m.Data, oPtr.Interface()); err != nil {
if c.Conn.Opts.AsyncErrorCB != nil {
c.Conn.ach.push(func() {
c.Conn.Opts.AsyncErrorCB(c.Conn, m.Sub, errors.New("nats: Got an error trying to unmarshal: "+err.Error()))
})
}
return
}
if argType.Kind() != reflect.Ptr {
oPtr = reflect.Indirect(oPtr)
}
// Callback Arity
switch numArgs {
case 1:
oV = []reflect.Value{oPtr}
case 2:
subV := reflect.ValueOf(m.Subject)
oV = []reflect.Value{subV, oPtr}
case 3:
subV := reflect.ValueOf(m.Subject)
replyV := reflect.ValueOf(m.Reply)
oV = []reflect.Value{subV, replyV, oPtr}
}
}
cbValue.Call(oV)
}, nil
}
and then users can do e.g.
func parallelSubscribe(ec *nats.EncodedConn, subject string, cb nats.Handler) (*Subscription, error){
natsCB, err := ToMsgHandler(ec, cb)
if err != nil{
return nil, err
}
ec.Subscribe(subject, func(msg *nats.Msg) {
go natsCB(msg)
})
}
This goes nicely with single responsibility principle
Another workaround if want to have a generic implementation without adding more APIs, also using reflect would be to rewrap the function to be ran as a goroutine... something like:
parallelize := func(f interface{}, fptr interface{}) {
fn := reflect.ValueOf(fptr).Elem()
oV := reflect.ValueOf(f)
handler := func(in []reflect.Value) []reflect.Value {
go oV.Call(in)
return nil
}
v := reflect.MakeFunc(fn.Type(), handler)
fn.Set(v)
}
fn1 := func(b []byte) {
fmt.Println(time.Now(), "FN1: ", b)
time.Sleep(1 * time.Second) // blocking
}
fn2 := func(subject string, b []byte) {
fmt.Println(time.Now(), "FN2: ", subject, b)
time.Sleep(1 * time.Second) // blocking
}
fn3 := func(subject, reply string, b []byte) {
fmt.Println(time.Now(), "FN3: ", subject, reply, b)
time.Sleep(1 * time.Second) // blocking
}
parallelize(fn1, &fn1)
parallelize(fn2, &fn2)
parallelize(fn3, &fn3)
ec.Subscribe("enc_bytes", fn1)
ec.Subscribe("enc_bytes", fn2)
ec.Subscribe("enc_bytes", fn3)
testBytes := []byte("Hello World!")
ec.Publish("enc_bytes", testBytes)
an example helper function could be:
ec, err := nats.NewEncodedConn(NewConnection(t, TEST_PORT), nats.JSON_ENCODER)
if err != nil {
t.Fatalf("Failed to create an encoded connection: %v\n", err)
}
defer ec.Close()
type Product struct {
Name string
}
pSubscribe := func(ec *nats.EncodedConn, subject string, cb interface{}) (*nats.Subscription, error) {
parallelize := func(f, fptr interface{}) {
// Original function.
oV := reflect.ValueOf(f)
// Pointer to the function that will be replaced.
ffn := reflect.ValueOf(fptr).Interface()
fn := reflect.ValueOf(ffn).Elem()
// Type of original function used to create new one
// that will replace the original.
typ := reflect.TypeOf(f)
// Rewrap original function.
handler := func(in []reflect.Value) []reflect.Value {
go oV.Call(in)
return nil
}
v := reflect.MakeFunc(typ, handler)
fn.Set(v)
}
parallelize(cb, &cb)
return ec.Subscribe(subject, cb)
}
pSubscribe(ec, ">", func(b []byte) {
fmt.Println(time.Now(), "FN0: ", string(b), b)
time.Sleep(1 * time.Second)
})
pSubscribe(ec, "enc_bytes", func(s string) {
fmt.Println(time.Now(), "FN1: ", s)
time.Sleep(1 * time.Second)
})
pSubscribe(ec, "enc_bytes", func(s *Product) {
fmt.Println(time.Now(), "From JSON:", s)
time.Sleep(1 * time.Second)
})
testBytes := []byte("Hello World!")
ec.Publish("enc_bytes", testBytes)
ec.Publish("enc_bytes", testBytes)
ec.Publish("enc_bytes", testBytes)
ec.Publish("enc_bytes", &Product{Name: string(testBytes)})
ec.Publish("enc_bytes", &Product{Name: string(testBytes)})
ec.Publish("enc_bytes", &Product{Name: string(testBytes)})
ec.PublishRequest("enc_bytes", "!!!!!!!", &Product{Name: string(testBytes)})
Hey @wallyqs it is quite clever solution you are proposing. I guess it would require some nil checks etc... and obviously tests. This was my main reason for having parallel handler processing as part of the go-nats library so people can just use it without need to come up with their own solution.
I believe it is quite valid case where tasks can take long time to run and we don't want to block incoming messages. Basically scenario where message ordering is not as much important as availability and throughput.
I will write a separate library to provide a support for parallel handlers. Do you want me to close this PR? Should I create a ticket for you guys to consider this in the future?
Thanks @pamelin I think that having an extra library to build this on top could be useful, could add more things as well like parallellism factor to cap the max number of goroutines for example. It is a very valid use case though in this case I think would be good to have this outside of the client for now, another ticket to revisit this in the future also sounds good!
I was under the impression that setting MaxAckPending would allow parallel processing of the callback to QueueSubscribe, but apparently not after reading this.
The docs makes it sound like there would be parallel processing (similar to how GCP pubsubs Receive() with MaxOutstanding works):
https://docs.nats.io/nats-concepts/jetstream/consumers#maxackpending
Any chance that this will be officially supported?
Thanks, Max
We are continually working on docs etc to help clarify.
The issue here is that we have three things in play. The stream, the consumer, and the multiple subscriptions bound to the single consumer.
MaxAckPending is a consumer state. So in this case it is a singleton.
If you want to have that type of behavior, I might suggest using pull consumers with defaults which mean each will get one message. Even though this is also a "single" consumer construct, this will behave the way you want it to I believe.
A chansubscribe with multiple workers picking off jobs (and optionally manual ack), or a limitless parallel handling could be as easy as; for x:=range ch { go msghandler(msg) }
or using the normal callback just spawn a goroutine with your real handler.