RxGo
RxGo copied to clipboard
Observe - unexpected behaviour in combination with Serialize / withPool
Hey. I've tried to recreate example with customer which is given in readme
What I assume is that using Filter or Map return 'new' observable So result https://play.golang.org/p/vALjdCtH1Sa is expected, as we are observing 'observable' but without applied any filters
To make it work, I can just assign observable https://play.golang.org/p/6Fa-0bhQWut (no result, as all customers have age 0
So far so good. The part that I do not understand is
If I use rxgo.Serialize
and withPool
it seems to behave strangely https://play.golang.org/p/6kvKTPL5RId
I would expect that it will print all of customers, as observable with filer was not assigned. Moreover result which I saw is
{{0 0 } <nil>}
{{53 0 } <nil>}
Seems filter was not applied, but why only 2 results were shown?
Able to reproduce error
Confirmed the above test cases. Running the following properly displays all of the customers:
func TestStop(t *testing.T) {
// Create the input channel
ch := make(chan Item)
// Data producer
go producer(ch)
// Create an Observable
observable := FromChannel(ch)
// observable.
// Filter(func(item interface{}) bool {
// // Filter operation
// customer := item.(Customer)
// return customer.Age > 18
// }).
// Map(func(_ context.Context, item interface{}) (interface{}, error) {
// // Enrich operation
// customer := item.(Customer)
// customer.TaxNumber = "1"
// return customer, nil
// },
// // Create multiple instances of the map operator
// WithPool(2),
// // Serialize the items emitted by their Customer.ID
// Serialize(func(item interface{}) int {
// customer := item.(Customer)
// return customer.ID
// }),
// WithBackPressureStrategy(Block),
// )
fmt.Printf("Observing...")
for customer := range observable.Observe() {
if customer.Error() {
panic(customer.E)
}
fmt.Println(customer)
}
fmt.Printf("Done")
}
This is the target behavior: what is usually expected when applying/adding operators to an observable using pipes: it shouldn't run if it doesn't have any subscribers/subscription; and using operators should not modify the original source channel.
API server listening at: 127.0.0.1:8857
Observing...{{0 0 } <nil>}
{{1 0 } <nil>}
{{2 0 } <nil>}
{{3 0 } <nil>}
{{4 0 } <nil>}
{{5 0 } <nil>}
{{6 0 } <nil>}
...
{{98 0 } <nil>}
DonePASS
Replicating the error
Running the same script, but applying Filter, Map(WithPool, Serialize) but still using the original observable
variable for the subscription returns different results.
API server listening at: 127.0.0.1:14416
Observing...{{0 0 } <nil>}
{{20 0 } <nil>}
{{61 0 } <nil>}
{{62 0 } <nil>}
{{63 0 } <nil>}
{{64 0 } <nil>}
{{65 0 } <nil>}
{{66 0 } <nil>}
{{67 0 } <nil>}
DonePASS
Note, there weren't any omitted logs in the above results. This means the original observable
variables was somehow mutated by one of the operators. After manually commenting out each of the operators and its options, I identified that it was the Map operator's serialize that adversely affected it.
Investigation
After digging deeper into the inner workings of the Serialize
operator, how observable factories were implemented, I found out that using Serialize actually consumes elements from the channel even if there wasn't any subscription.
Root cause
The root cause is the Observe
call to the original iterable (the observable that hasn't been modified yet) on line 171 of observable.go
:
if serialized, f := option.isSerialized(); serialized {
firstItemIDCh := make(chan Item, 1)
fromCh := make(chan Item, 1)
obs := &ObservableImpl{
iterable: newFactoryIterable(func(propagatedOptions ...Option) <-chan Item {
mergedOptions := append(opts, propagatedOptions...)
option := parseOptions(mergedOptions...)
next := option.buildChannel()
ctx := option.buildContext()
// This specific line v
observe := iterable.Observe(opts...)
// This specific line ^
go func() {
select {
case <-ctx.Done():
return
case firstItemID := <-firstItemIDCh:
if firstItemID.Error() {
firstItemID.SendContext(ctx, fromCh)
return
}
Of(firstItemID.V.(int)).SendContext(ctx, fromCh)
runParallel(ctx, next, observe, operatorFactory, bypassGather, option, mergedOptions...)
}
}()
runFirstItem(ctx, f, firstItemIDCh, observe, next, operatorFactory, option, mergedOptions...)
return next
}),
}
return obs.serialize(fromCh, f)
The Observe
method consumes some of the elements of the original channel, thus losing elements on the subscribe. So it was not that the operators 'mutated' the original observable, but that it consumed emissions from the original observable (I guess it could be argued that the operators applied changes to the channel when it shouldn't have).
To confirm that this was an error caused by emissions consumed by one channel and missing in another channel, applying the WithPublishStrategy()
on the FromChannel
constructor solves the issue and returns the desired list (all of the customers from 1 to 98).
// Create a --> multicast <-- Observable
observable := FromChannel(ch, WithPublishStrategy())
Recommendation
The serialize method needs to be reworked such that it doesn't consume elements from the observable unless a subscription has already been made. Not sure how to do that exactly, but feel free to correct me if there was something wrong with my diagnosis.