mangos
mangos copied to clipboard
Routing job load across some agents [ req/rep - pull/push]
Hello! everybody working in this great library.
I'm evaluating mangos as base library to spread load from "master" (who sends jobs ) to a lot of "agents".
Agents right now are all in the same category and any of the agents, will run any of the messages in the master queue.
master && agents Execution
without routing
- Master
go run pipeline.go master tcp://127.0.0.1:40899
- Agent 1
go run pipeline.go agent tcp://127.0.0.1:40899 AGENT1
- Agent 2
go run pipeline.go agent tcp://127.0.0.1:40899 AGENT2
- Agent 3
go run pipeline.go agent tcp://127.0.0.1:40899 AGENT3
- Agent 4
go run pipeline.go agent tcp://127.0.0.1:40899 AGENT4
I would like to add labels to agents in any way to route messages by example by environment=production, testing and master will send messages only to the agents with its proper label. ( agents will only dequeue messages routed with its own label )
This new feature will apply on req/rep and pull/push protocols
Execution examples...
with routing
- Master
go run pipeline.go master tcp://127.0.0.1:40899
- Agent 1
go run pipeline.go agent tcp://127.0.0.1:40899 AGENT1 env=production
- Agent 2
go run pipeline.go agent tcp://127.0.0.1:40899 AGENT2 env=production
- Agent 3
go run pipeline.go agent tcp://127.0.0.1:40899 AGENT3 env=testing
- Agent 4
go run pipeline.go agent tcp://127.0.0.1:40899 AGENT4 env=testing
PoC Code
Without Routing ( as currently is working)
I did the PoC with this code.
package main
import (
"fmt"
"os"
"time"
"go.nanomsg.org/mangos/v3"
// register transports
"go.nanomsg.org/mangos/v3/protocol/rep"
"go.nanomsg.org/mangos/v3/protocol/req"
_ "go.nanomsg.org/mangos/v3/transport/all"
)
func die(format string, v ...interface{}) {
fmt.Fprintln(os.Stderr, fmt.Sprintf(format, v...))
os.Exit(1)
}
func agent(url string, agentName string) {
var sock mangos.Socket
var err error
var msg []byte
if sock, err = rep.NewSocket(); err != nil {
die("can't get new pull socket: %s", err)
}
if err = sock.Dial(url); err != nil {
die("can't dial on push socket: %s", err.Error())
}
for {
// Could also use sock.RecvMsg to get header
msg, err = sock.Recv()
fmt.Printf("AGENT [%s]: RECEIVED \"%s\"\n", agentName, msg)
err = sock.Send([]byte(agentName))
if err != nil {
die("can't send reply: %s", err.Error())
}
}
}
func master(url string) {
var sock mangos.Socket
var err error
if sock, err = req.NewSocket(); err != nil {
die("can't get new push socket: %s", err.Error())
}
defer sock.Close()
if err = sock.Listen(url); err != nil {
die("can't listen on pull socket: %s", err.Error())
}
for {
var resp []byte
time.Sleep(time.Second / 2)
msg := "MSG: " + time.Now().String()
if err = sock.Send([]byte(msg)); err != nil {
die("can't send message on push socket: %s", err.Error())
}
if resp, err = sock.Recv(); err != nil {
die("can't receive resp: %s", err.Error())
}
fmt.Printf("MASTER: SENDED [%s] RECEIVED [%s] \n", msg, string(resp))
}
}
func main() {
if len(os.Args) > 2 && os.Args[1] == "agent" {
agent(os.Args[2], os.Args[3])
os.Exit(0)
}
if len(os.Args) > 3 && os.Args[1] == "master" {
master(os.Args[2])
os.Exit(0)
}
fmt.Fprintf(os.Stderr,
"Usage: pipeline agent|master <URL> <ARG> ...\n")
os.Exit(1)
}
With Routing in Agent (pseudocode)
func agent(url string, agentName string,environment string) {
var sock mangos.Socket
var err error
var msg []byte
if sock, err = rep.NewSocket(); err != nil {
die("can't get new pull socket: %s", err)
}
if err = sock.Dial(url); err != nil {
die("can't dial on push socket: %s", err.Error())
}
for {
// Will add labels to Recv to dequeue only messages with this label.
msg, err = sock.Recv(environment) // <-------------CHANGE HERE
fmt.Printf("AGENT [%s]: RECEIVED \"%s\"\n", agentName, msg)
err = sock.Send([]byte(agentName))
if err != nil {
die("can't send reply: %s", err.Error())
}
}
}
With Routing in Master(pseudocode)
var sock mangos.Socket
func procesJobs(env string) {
var err error
for {
var resp []byte
time.Sleep(time.Second / 2)
msg := "MSG: " + time.Now().String()
if err = sock.Send(env,[]byte(msg)); err != nil { //<-----------------CHANGED HERE
die("can't send message on push socket: %s", err.Error())
}
if resp, err = sock.Recv(); err != nil {
die("can't receive resp: %s", err.Error())
}
fmt.Printf("MASTER: SENDED [%s] RECEIVED [%s] \n", msg, string(resp))
}
}
func master(url string) {
var err error
if sock, err = req.NewSocket(); err != nil {
die("can't get new push socket: %s", err.Error())
}
defer sock.Close()
if err = sock.Listen(url); err != nil {
die("can't listen on pull socket: %s", err.Error())
}
//jobs for production
go procesJobs("production")
go procesJobs("testing")
}
The best way to achieve what you want is to use separate service addresses (URLs) for production, development, etc. Basically use a completely disjoint topology, I think.
The sockets otherwise have no idea how to route your traffic, and really there is nothing currently in the protocol that would allow them to do that.
Hello @gdamore , Separate URL's implies restart the "master" and reconfigure "agents" on each new "route" I think is not a good solution for me.
The real use case is a web site performance measuring system , right now agents only over AWS "eu-west-1" , but perhaps tomorrow I will add a new agent on AWS "ca-central-1", I would like control measuring jobs from each AWS zone. the job will have a location parameter, and each agent will be started with location label. The master should be transparent to the number of different locations, this is, jobs should be routed thought the same "socket".
There is much work to add this feature? perhaps we can help to add if somebody can help us.
Hello @gdamore I'm still interested in this feature, and we would like to help implement it if not very difficult and you could assist us.
Its not trivial to do this -- because as I said it, there is nothing in the protocol itself to guide routing decisions.
You could probably build a router, starting from the code implementing nng_device(), but you'd need to have it make informed routing decisions (presumably by looking at your application specific payload.)
There might be another way if you're willing to have a separate service address locally, where you use different local routers, which could be uninformed leaving the decision about where to send the message to client.