RxGo
RxGo copied to clipboard
Counting items of a Connectable Observable
Hello, I am trying to count events produced by a Connectable Observable in 2 ways:
- The total number of events and
- The number of events that pass a filter.
Here is some example code:
package main
import (
"context"
"fmt"
"github.com/reactivex/rxgo/v2"
)
func main() {
events := rxgo.Create([]rxgo.Producer{func(_ context.Context, next chan<- rxgo.Item) {
next <- rxgo.Of(expensiveReadFromDisk(0))
next <- rxgo.Of(expensiveReadFromDisk(1))
next <- rxgo.Of(expensiveReadFromDisk(2))
}}, rxgo.WithPublishStrategy())
total := events.Count()
filtered := events.Filter(func(i interface{}) bool {
return i.(int) > 0
}).Count()
events.Connect(context.Background())
t, _ := total.Get()
fmt.Printf(" Total: %d\n", t.V)
f, _ := filtered.Get()
fmt.Printf("Filtered: %d\n", f.V)
}
func expensiveReadFromDisk(e int) int {
fmt.Printf("Reading event: %d\n", e)
return e
}
I expected the code to output
Reading event: 0
Reading event: 1
Reading event: 2
Total: 3
Filtered: 2
Instead, however, the code outputs only this:
Reading event: 0
Reading event: 1
Reading event: 2
Total: 3
Then it blocks on the following line and gets stuck forever.
f, _ := filtered.Get()
Is this the intended behavior? If yes, what would be the correct way of achieving the intended result? Thank you very much!