gostream icon indicating copy to clipboard operation
gostream copied to clipboard

Stream Processing library for Go

gostream

PkgGoDev Go Report Card tests

Stream Processing Library for Go

TODO

  • [x] Window
    • [x] LengthWindow
    • [x] LengthBatchWindow
    • [x] TimeWindow
    • [x] TimeBatchWindow
  • [x] Select
  • [ ] Where
    • [x] Equals, NotEquals
    • [x] Larger, Less
    • [ ] AND, OR
  • [x] OrderBy
  • [x] Limit, Offset
  • [x] Aggregate Function
    • [x] Avg, Sum, Count
    • [x] Max, Min

Example

type LogEvent struct {
  Time    time.Time
  Level   int
  Message string
}

q := "select * from LogEvent.length(10)"
s, err := gostream.New().
    Add(LogEvent{}).
    Query(q)
if err != nil {
  fmt.Printf("new gostream: %v", err)
  return
}
defer s.Close()

go func() {
  for {
    fmt.Printf("%v\n", <-s.Output())
  }
}()

s.Input() <- LogEvent{
  Time: time.Now()
  Level: 1
  Message: "something happened"
}
type LogEvent struct {
  Time    time.Time
  Level   int
  Message string
}

s := stream.New().
  SelectAll().
  From(LogEvent{}).
  Length(10).
  OrderBy("Level", stream.DESC).
  Limit(10, 5)
defer s.Close()
go s.Run()

go func() {
  for {
    fmt.Printf("%v\n", <-s.Output())
  }
}()

s.Input() <- LogEvent{
  Time: time.Now()
  Level: 1
  Message: "something happened"
}