gostorm
gostorm copied to clipboard
Need better example
Hi,
can anyone explain how I can use this library? A complete example would be nice. Your splitsentence example differs completely from the description. This is confusing.
Thanks!
As a newbie of Storm, also confused by the example
We just fought a whole working day through this and we came up with the following working example. It emits a timestamp encoded in a []byte
from a Spout and calculates the latency inside the Bolt.
// spout.go
package main
import (
"encoding/binary"
"fmt"
"log"
"os"
"strconv"
"time"
"github.com/jsgilmore/gostorm"
_ "github.com/jsgilmore/gostorm/encodings"
stormmsg "github.com/jsgilmore/gostorm/messages"
)
type timestampSpout struct {
payloadSize int
stream string
context *stormmsg.Context
collector gostorm.SpoutOutputCollector
idCounter uint64
}
func (s *timestampSpout) NextTuple() {
tb := make([]byte, s.payloadSize)
binary.LittleEndian.PutUint64(tb, uint64(time.Now().UnixNano()))
s.collector.Emit(strconv.FormatUint(s.idCounter, 10), s.stream, tb)
s.idCounter++
}
func (s *timestampSpout) Acked(id string) {
log.Println("Acked", id)
}
func (s *timestampSpout) Failed(id string) {
log.Println("Failed", id)
}
func (s *timestampSpout) Exit() {
log.Println("Exiting")
}
func (s *timestampSpout) Open(ctx *stormmsg.Context, collector gostorm.SpoutOutputCollector) {
s.context = ctx
s.collector = collector
}
func main() {
// Logging is done to an output file, since stdout and stderr are captured
fo, err := os.Create(fmt.Sprintf("timestampSpout-%d.txt", os.Getpid()))
if err != nil {
panic(err)
}
defer fo.Close()
log.SetOutput(fo)
encoding := "jsonEncoded"
s := ×tampSpout{
payloadSize: 90,
stream: "",
}
gostorm.RunSpout(s, encoding)
}
// bolt.go
package main
import (
"encoding/binary"
"fmt"
"log"
"os"
"time"
"github.com/jsgilmore/gostorm"
_ "github.com/jsgilmore/gostorm/encodings"
stormmsg "github.com/jsgilmore/gostorm/messages"
)
type latencyBolt struct {
context *stormmsg.Context
collector gostorm.OutputCollector
}
func (s *latencyBolt) Fields() []interface{} {
ret := make([]interface{}, 1)
payload := make([]byte, 90)
ret[0] = &payload
return ret
}
func (s *latencyBolt) Execute(meta stormmsg.BoltMsgMeta, fields ...interface{}) {
if meta.GetStream() == "__heartbeat" {
s.collector.SendAck(meta.Id)
return
}
switch payload := fields[0].(type) {
case *[]byte:
now := time.Now()
sent := int64(binary.LittleEndian.Uint64(*payload))
nowNano := now.UnixNano()
latency := nowNano - sent
s.collector.Emit([]string{meta.Id}, "latency", latency)
s.collector.SendAck(meta.Id)
log.Println(meta.Id, latency)
default:
s.collector.SendFail(meta.Id)
}
}
func (s *latencyBolt) Prepare(context *stormmsg.Context, collector gostorm.OutputCollector) {
s.context = context
s.collector = collector
}
func (s *latencyBolt) Cleanup() {
}
func main() {
// Logging is done to an output file, since stdout and stderr are captured
fo, err := os.Create(fmt.Sprintf("/tmp/latencyBolt-%d.txt", os.Getpid()))
if err != nil {
panic(err)
}
defer fo.Close()
log.SetOutput(fo)
go func() {
if r := recover(); r != nil {
log.Panicf("Recovered panic: %v", r)
}
}()
encoding := "jsonEncoded"
b := &latencyBolt{}
gostorm.RunBolt(b, encoding)
}
Just for reference here's also our clojure definition for the Topology. It expects that the binaries are in the executing user's $HOME/bin
dir.
; latency.clj
(ns storm-evaluation.latency
(:require
[org.apache.storm.clojure :as storm]
[org.apache.storm.config :as config]))
(def home-bin-dir (str (System/getProperty "user.home") "/bin/"))
(defn mk-topology
[]
(storm/topology
{"1" (storm/shell-spout-spec (str home-bin-dir "spout") "" ["timestamps"] :p 1)}
{"2" (storm/shell-bolt-spec {"1" :shuffle} (str home-bin-dir "bolt") "" ["latency"] :p 1)}))
I created a minimal example here:
https://github.com/sixgill/gostorm-runner
@dominiek
Not sure if this should be included in the gostorm repo or just linked in the docs.