gostorm copied to clipboard
Need better example
can anyone explain how I can use this library? A complete example would be nice. Your splitsentence example differs completely from the description. This is confusing.
As a newbie of Storm, also confused by the example
We just fought a whole working day through this and we came up with the following working example. It emits a timestamp encoded in a []byte
from a Spout and calculates the latency inside the Bolt.
// spout.go
package main
import (
_ "github.com/jsgilmore/gostorm/encodings"
stormmsg "github.com/jsgilmore/gostorm/messages"
type timestampSpout struct {
payloadSize int
stream string
context *stormmsg.Context
collector gostorm.SpoutOutputCollector
idCounter uint64
func (s *timestampSpout) NextTuple() {
tb := make([]byte, s.payloadSize)
binary.LittleEndian.PutUint64(tb, uint64(time.Now().UnixNano()))
s.collector.Emit(strconv.FormatUint(s.idCounter, 10), s.stream, tb)
func (s *timestampSpout) Acked(id string) {
log.Println("Acked", id)
func (s *timestampSpout) Failed(id string) {
log.Println("Failed", id)
func (s *timestampSpout) Exit() {
func (s *timestampSpout) Open(ctx *stormmsg.Context, collector gostorm.SpoutOutputCollector) {
s.context = ctx
s.collector = collector
func main() {
// Logging is done to an output file, since stdout and stderr are captured
fo, err := os.Create(fmt.Sprintf("timestampSpout-%d.txt", os.Getpid()))
if err != nil {
defer fo.Close()
encoding := "jsonEncoded"
s := ×tampSpout{
payloadSize: 90,
stream: "",
gostorm.RunSpout(s, encoding)
// bolt.go
package main
import (
_ "github.com/jsgilmore/gostorm/encodings"
stormmsg "github.com/jsgilmore/gostorm/messages"
type latencyBolt struct {
context *stormmsg.Context
collector gostorm.OutputCollector
func (s *latencyBolt) Fields() []interface{} {
ret := make([]interface{}, 1)
payload := make([]byte, 90)
ret[0] = &payload
return ret
func (s *latencyBolt) Execute(meta stormmsg.BoltMsgMeta, fields ...interface{}) {
if meta.GetStream() == "__heartbeat" {
switch payload := fields[0].(type) {
case *[]byte:
now := time.Now()
sent := int64(binary.LittleEndian.Uint64(*payload))
nowNano := now.UnixNano()
latency := nowNano - sent
s.collector.Emit([]string{meta.Id}, "latency", latency)
log.Println(meta.Id, latency)
func (s *latencyBolt) Prepare(context *stormmsg.Context, collector gostorm.OutputCollector) {
s.context = context
s.collector = collector
func (s *latencyBolt) Cleanup() {
func main() {
// Logging is done to an output file, since stdout and stderr are captured
fo, err := os.Create(fmt.Sprintf("/tmp/latencyBolt-%d.txt", os.Getpid()))
if err != nil {
defer fo.Close()
go func() {
if r := recover(); r != nil {
log.Panicf("Recovered panic: %v", r)
encoding := "jsonEncoded"
b := &latencyBolt{}
gostorm.RunBolt(b, encoding)
Just for reference here's also our clojure definition for the Topology. It expects that the binaries are in the executing user's $HOME/bin
; latency.clj
(ns storm-evaluation.latency
[org.apache.storm.clojure :as storm]
[org.apache.storm.config :as config]))
(def home-bin-dir (str (System/getProperty "user.home") "/bin/"))
(defn mk-topology
{"1" (storm/shell-spout-spec (str home-bin-dir "spout") "" ["timestamps"] :p 1)}
{"2" (storm/shell-bolt-spec {"1" :shuffle} (str home-bin-dir "bolt") "" ["latency"] :p 1)}))
I created a minimal example here:
Not sure if this should be included in the gostorm repo or just linked in the docs.