go-observer icon indicating copy to clipboard operation
go-observer copied to clipboard

Stream operators

Open botchris opened this issue 4 years ago • 0 comments

Hi,

I'm trying to implement some sort of Rx-like stream operators. For instance, I would like to be able to "filter" or "map" messages on a Stream. Any idea on how could this be achieved using this library?

For example, I would like to filter only messages that implements a concrete interface:

stream := p.Observe()
stream = Filter(stream, func (m interface{}) bool {
    _, ok := m.(MyCustomInterface)

    return ok
})


for {
  select {
    case <-stream.Changes():
      stream.Next()

      val = stream.Value()
      // ... val is always of type `MyCustomInterface`
  }
}

botchris avatar Aug 12 '20 18:08 botchris