RxGo icon indicating copy to clipboard operation
RxGo copied to clipboard

Observe - unexpected behaviour in combination with Serialize / withPool

Open mimol91 opened this issue 4 years ago • 1 comments

Hey. I've tried to recreate example with customer which is given in readme

What I assume is that using Filter or Map return 'new' observable So result https://play.golang.org/p/vALjdCtH1Sa is expected, as we are observing 'observable' but without applied any filters

To make it work, I can just assign observable https://play.golang.org/p/6Fa-0bhQWut (no result, as all customers have age 0


So far so good. The part that I do not understand is If I use rxgo.Serialize and withPool it seems to behave strangely https://play.golang.org/p/6kvKTPL5RId

I would expect that it will print all of customers, as observable with filer was not assigned. Moreover result which I saw is

{{0 0 } <nil>}
{{53 0 } <nil>}

Seems filter was not applied, but why only 2 results were shown?

mimol91 avatar Nov 26 '20 10:11 mimol91

Able to reproduce error

Confirmed the above test cases. Running the following properly displays all of the customers:


func TestStop(t *testing.T) {
	// Create the input channel
	ch := make(chan Item)
	// Data producer
	go producer(ch)

	// Create an Observable
	observable := FromChannel(ch)

	// observable.
	// 	Filter(func(item interface{}) bool {
	// 		// Filter operation
	// 		customer := item.(Customer)
	// 		return customer.Age > 18
	// 	}).
	// 	Map(func(_ context.Context, item interface{}) (interface{}, error) {
	// 		// Enrich operation
	// 		customer := item.(Customer)

	// 		customer.TaxNumber = "1"
	// 		return customer, nil
	// 	},
	// 		// Create multiple instances of the map operator
	// 		WithPool(2),
	// 		// Serialize the items emitted by their Customer.ID
	// 		Serialize(func(item interface{}) int {
	// 			customer := item.(Customer)
	// 			return customer.ID
	// 		}),
	// 		WithBackPressureStrategy(Block),
	// 	)

	fmt.Printf("Observing...")
	for customer := range observable.Observe() {
		if customer.Error() {
			panic(customer.E)
		}
		fmt.Println(customer)
	}
	fmt.Printf("Done")
}

This is the target behavior: what is usually expected when applying/adding operators to an observable using pipes: it shouldn't run if it doesn't have any subscribers/subscription; and using operators should not modify the original source channel.

API server listening at: 127.0.0.1:8857
Observing...{{0 0 } <nil>}
{{1 0 } <nil>}
{{2 0 } <nil>}
{{3 0 } <nil>}
{{4 0 } <nil>}
{{5 0 } <nil>}
{{6 0 } <nil>}
...
{{98 0 } <nil>}
DonePASS

Replicating the error

Running the same script, but applying Filter, Map(WithPool, Serialize) but still using the original observable variable for the subscription returns different results.

API server listening at: 127.0.0.1:14416
Observing...{{0 0 } <nil>}
{{20 0 } <nil>}
{{61 0 } <nil>}
{{62 0 } <nil>}
{{63 0 } <nil>}
{{64 0 } <nil>}
{{65 0 } <nil>}
{{66 0 } <nil>}
{{67 0 } <nil>}
DonePASS

Note, there weren't any omitted logs in the above results. This means the original observable variables was somehow mutated by one of the operators. After manually commenting out each of the operators and its options, I identified that it was the Map operator's serialize that adversely affected it.

Investigation

After digging deeper into the inner workings of the Serialize operator, how observable factories were implemented, I found out that using Serialize actually consumes elements from the channel even if there wasn't any subscription.

Root cause

The root cause is the Observe call to the original iterable (the observable that hasn't been modified yet) on line 171 of observable.go:

if serialized, f := option.isSerialized(); serialized {
		firstItemIDCh := make(chan Item, 1)
		fromCh := make(chan Item, 1)
		obs := &ObservableImpl{
			iterable: newFactoryIterable(func(propagatedOptions ...Option) <-chan Item {
				mergedOptions := append(opts, propagatedOptions...)
				option := parseOptions(mergedOptions...)

				next := option.buildChannel()
				ctx := option.buildContext()
// This specific line v
				observe := iterable.Observe(opts...)
// This specific line ^
				go func() {
					select {
					case <-ctx.Done():
						return
					case firstItemID := <-firstItemIDCh:
						if firstItemID.Error() {
							firstItemID.SendContext(ctx, fromCh)
							return
						}
						Of(firstItemID.V.(int)).SendContext(ctx, fromCh)
						runParallel(ctx, next, observe, operatorFactory, bypassGather, option, mergedOptions...)
					}
				}()
				runFirstItem(ctx, f, firstItemIDCh, observe, next, operatorFactory, option, mergedOptions...)
				return next
			}),
		}
		return obs.serialize(fromCh, f)

The Observe method consumes some of the elements of the original channel, thus losing elements on the subscribe. So it was not that the operators 'mutated' the original observable, but that it consumed emissions from the original observable (I guess it could be argued that the operators applied changes to the channel when it shouldn't have).

To confirm that this was an error caused by emissions consumed by one channel and missing in another channel, applying the WithPublishStrategy() on the FromChannel constructor solves the issue and returns the desired list (all of the customers from 1 to 98).

	// Create a --> multicast <-- Observable
	observable := FromChannel(ch, WithPublishStrategy())

Recommendation

The serialize method needs to be reworked such that it doesn't consume elements from the observable unless a subscription has already been made. Not sure how to do that exactly, but feel free to correct me if there was something wrong with my diagnosis.

darrensapalo avatar Dec 05 '20 18:12 darrensapalo