RxGo icon indicating copy to clipboard operation
RxGo copied to clipboard

Counting items of a Connectable Observable

Open matejpavlovic opened this issue 3 years ago • 0 comments

Hello, I am trying to count events produced by a Connectable Observable in 2 ways:

  • The total number of events and
  • The number of events that pass a filter.

Here is some example code:

package main

import (
	"context"
	"fmt"

	"github.com/reactivex/rxgo/v2"
)

func main() {
	events := rxgo.Create([]rxgo.Producer{func(_ context.Context, next chan<- rxgo.Item) {
		next <- rxgo.Of(expensiveReadFromDisk(0))
		next <- rxgo.Of(expensiveReadFromDisk(1))
		next <- rxgo.Of(expensiveReadFromDisk(2))
	}}, rxgo.WithPublishStrategy())

	total := events.Count()
	filtered := events.Filter(func(i interface{}) bool {
		return i.(int) > 0
	}).Count()

	events.Connect(context.Background())

	t, _ := total.Get()
	fmt.Printf("   Total: %d\n", t.V)
	f, _ := filtered.Get()
	fmt.Printf("Filtered: %d\n", f.V)
}

func expensiveReadFromDisk(e int) int {
	fmt.Printf("Reading event: %d\n", e)
	return e
}

I expected the code to output

Reading event: 0
Reading event: 1
Reading event: 2
   Total: 3
Filtered: 2

Instead, however, the code outputs only this:

Reading event: 0
Reading event: 1
Reading event: 2
   Total: 3

Then it blocks on the following line and gets stuck forever.

	f, _ := filtered.Get()

Is this the intended behavior? If yes, what would be the correct way of achieving the intended result? Thank you very much!

matejpavlovic avatar Nov 07 '22 04:11 matejpavlovic