gostorm icon indicating copy to clipboard operation
gostorm copied to clipboard

Need better example

Open 0xd3e opened this issue 8 years ago • 3 comments

Hi,

can anyone explain how I can use this library? A complete example would be nice. Your splitsentence example differs completely from the description. This is confusing.

Thanks!

0xd3e avatar Jan 20 '17 15:01 0xd3e

As a newbie of Storm, also confused by the example

asaarashi avatar Mar 02 '17 03:03 asaarashi

We just fought a whole working day through this and we came up with the following working example. It emits a timestamp encoded in a []byte from a Spout and calculates the latency inside the Bolt.

// spout.go
package main

import (
	"encoding/binary"
	"fmt"
	"log"
	"os"
	"strconv"
	"time"

	"github.com/jsgilmore/gostorm"
	_ "github.com/jsgilmore/gostorm/encodings"
	stormmsg "github.com/jsgilmore/gostorm/messages"
)

type timestampSpout struct {
	payloadSize int
	stream      string
	context     *stormmsg.Context
	collector   gostorm.SpoutOutputCollector
	idCounter   uint64
}

func (s *timestampSpout) NextTuple() {
	tb := make([]byte, s.payloadSize)
	binary.LittleEndian.PutUint64(tb, uint64(time.Now().UnixNano()))
	s.collector.Emit(strconv.FormatUint(s.idCounter, 10), s.stream, tb)
	s.idCounter++
}

func (s *timestampSpout) Acked(id string) {
	log.Println("Acked", id)
}

func (s *timestampSpout) Failed(id string) {
	log.Println("Failed", id)
}

func (s *timestampSpout) Exit() {
	log.Println("Exiting")
}

func (s *timestampSpout) Open(ctx *stormmsg.Context, collector gostorm.SpoutOutputCollector) {
	s.context = ctx
	s.collector = collector
}

func main() {

	// Logging is done to an output file, since stdout and stderr are captured
	fo, err := os.Create(fmt.Sprintf("timestampSpout-%d.txt", os.Getpid()))
	if err != nil {
		panic(err)
	}
	defer fo.Close()
	log.SetOutput(fo)

	encoding := "jsonEncoded"
	s := &timestampSpout{
		payloadSize: 90,
		stream:      "",
	}
	gostorm.RunSpout(s, encoding)
}
// bolt.go
package main

import (
	"encoding/binary"
	"fmt"
	"log"
	"os"
	"time"

	"github.com/jsgilmore/gostorm"
	_ "github.com/jsgilmore/gostorm/encodings"
	stormmsg "github.com/jsgilmore/gostorm/messages"
)

type latencyBolt struct {
	context   *stormmsg.Context
	collector gostorm.OutputCollector
}

func (s *latencyBolt) Fields() []interface{} {
	ret := make([]interface{}, 1)
	payload := make([]byte, 90)
	ret[0] = &payload
	return ret
}

func (s *latencyBolt) Execute(meta stormmsg.BoltMsgMeta, fields ...interface{}) {
	if meta.GetStream() == "__heartbeat" {
		s.collector.SendAck(meta.Id)
		return
	}
	switch payload := fields[0].(type) {
	case *[]byte:
		now := time.Now()
		sent := int64(binary.LittleEndian.Uint64(*payload))
		nowNano := now.UnixNano()
		latency := nowNano - sent
		s.collector.Emit([]string{meta.Id}, "latency", latency)
		s.collector.SendAck(meta.Id)
		log.Println(meta.Id, latency)
	default:
		s.collector.SendFail(meta.Id)
	}
}

func (s *latencyBolt) Prepare(context *stormmsg.Context, collector gostorm.OutputCollector) {
	s.context = context
	s.collector = collector
}

func (s *latencyBolt) Cleanup() {
}

func main() {

	// Logging is done to an output file, since stdout and stderr are captured
	fo, err := os.Create(fmt.Sprintf("/tmp/latencyBolt-%d.txt", os.Getpid()))
	if err != nil {
		panic(err)
	}
	defer fo.Close()
	log.SetOutput(fo)

	go func() {
		if r := recover(); r != nil {
			log.Panicf("Recovered panic: %v", r)
		}
	}()

	encoding := "jsonEncoded"
	b := &latencyBolt{}
	gostorm.RunBolt(b, encoding)
}

Just for reference here's also our clojure definition for the Topology. It expects that the binaries are in the executing user's $HOME/bin dir.

; latency.clj
(ns storm-evaluation.latency
  (:require
    [org.apache.storm.clojure :as storm]
    [org.apache.storm.config :as config]))

(def home-bin-dir (str (System/getProperty "user.home") "/bin/"))

(defn mk-topology
  []
  (storm/topology
    {"1" (storm/shell-spout-spec (str home-bin-dir "spout") "" ["timestamps"] :p 1)}
    {"2" (storm/shell-bolt-spec {"1" :shuffle} (str home-bin-dir "bolt") "" ["latency"] :p 1)}))

wilriker avatar Mar 10 '17 14:03 wilriker

I created a minimal example here:

https://github.com/sixgill/gostorm-runner

@dominiek

Not sure if this should be included in the gostorm repo or just linked in the docs.

edrex avatar Oct 06 '17 20:10 edrex