bench icon indicating copy to clipboard operation
bench copied to clipboard

MQTT

Open oedemis opened this issue 9 years ago • 0 comments

Have you plans to implement also a requester for MQTT i have no experience with go but i have tested a some mqtt here is a mqtt_requester but im not sure what im doing :)

package requester

import ( "fmt" "os" "github.com/satori/go.uuid" "github.com/tylertreat/bench" MQTT "git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git" )

// MQTTRequesterFactory implements RequesterFactory by creating a Requester // which publishes messages to an AMQP exchange and waits to consume them. type MQTTRequesterFactory struct { URL string TOPICs []string }

// GetRequester returns a new Requester, called for each Benchmark connection. func (r *MQTTRequesterFactory) GetRequester(num uint64) bench.Requester { return &mqttRequester{ url: r.URL, topics: r.TOPICs, } }

// amqpRequester implements Requester by publishing a message to an MQTT // exhcnage and waiting to consume it????. type mqttRequester struct { url string topics []string client *MQTT.Client }

var f MQTT.MessageHandler = func(client *MQTT.Client, msg MQTT.Message) { fmt.Printf("TOPIC: %s\n", msg.Topic()) fmt.Printf("MSG: %s\n", msg.Payload()) }

// Setup prepares the Requester for benchmarking. func (r *mqttRequester) Setup() error { u1 := uuid.NewV4()

opts := MQTT.NewClientOptions().AddBroker(r.url).SetClientID(u1.String())
//opts.SetDefaultPublishHandler(f)

c := MQTT.NewClient(opts)
r.client = c
if token := c.Connect(); token.Wait() && token.Error() != nil {
    panic(token.Error())
}

for i:= 0; i < len(r.topics); i++ {
    if token := c.Subscribe(r.topics[i], 0, nil); token.Wait() && token.Error() != nil {
        fmt.Println(token.Error())
        os.Exit(1)
    }       
}

return nil

}

// Request performs a synchronous request to the system under test. func (r *mqttRequester) Request() error {

/*if token := r.client.Subscribe("/go-mqtt/sample", 0, nil); token.Wait() && token.Error() != nil {
    fmt.Println(token.Error())
    os.Exit(1)
}*/

text := fmt.Sprintf("this is msg #%d!", 1)
//token := r.client.Publish("/go-mqtt/sample", 0, false, text)
//r.client.Publish("/go-mqtt/sample", 0, false, text)

for i:= 0; i < len(r.topics); i++ {
    r.client.Publish(r.topics[i], 0, false, text)
}

return nil

}

// Teardown is called upon benchmark completion. func (r *mqttRequester) Teardown() error { r.client.Disconnect(250) return nil }

and main.go

package main

import ( "fmt" "time"

"github.com/tylertreat/bench"
"./requester"

)

//"github.com/tylertreat/bench/requester" func main() { /r := &requester.WebRequesterFactory{ URL: "http://localhost:8080/", }/

r := &requester.MQTTRequesterFactory{
    URL:         "tcp://127.0.0.1:1883",
    TOPICs:      []string{"topic1", "topic2"},
}

benchmark := bench.NewBenchmark(r, 10000, 1, 30*time.Second)
summary, err := benchmark.Run()
if err != nil {
    panic(err)
}

fmt.Println(summary)
summary.GenerateLatencyDistribution(bench.Logarithmic, "mqtt.txt")

}

oedemis avatar Jan 13 '16 06:01 oedemis