RxGo
RxGo copied to clipboard
FlatMap seems not parallelly WithPool or WithCPUPool
FlatMap not run parallelly WithPool or WithCPUPool
See:
package main
import (
"fmt"
"time"
"github.com/reactivex/rxgo/v2"
)
func main() {
obs := rxgo.Just(1, 2, 3)().FlatMap(
func(i rxgo.Item) rxgo.Observable {
time.Sleep(time.Second)
return rxgo.Just(i.V.(int)*10, i.V.(int)*100)()
},
rxgo.WithCPUPool(),
)
start := time.Now().Unix()
for item := range obs.Observe() {
fmt.Printf("item: %+v\n", item)
}
end := time.Now().Unix()
fmt.Printf("cost: %d seconds\n", end-start)
}
Expect: cost 1 seconds
Actual: cost 3 seconds
#279 Is this fix ready? This works as expected in the other implementations of reactivex I tried(RxJS, RxJava/Reactor).
rxgo.Just(urls)().FlatMap(func(i rxgo.Item) rxgo.Observable {
return doRequest(i.V.(string))
}, rxgo.WithPool(32)).ForEach(out, func(e error) {}, func() {})
rxgo.Just(urls)().Map(func(c context.Context, i interface{}) (interface{}, error) {
val, err := <-doRequest(i.(string)).Observe()
return val, err
}, rxgo.WithPool(32)).ForEach(out, func(e error) {}, func() {})
These two should do the same thing
#279 Is this fix ready? This works as expected in the other implementations of reactivex I tried(RxJS, RxJava/Reactor).
rxgo.Just(urls)().FlatMap(func(i rxgo.Item) rxgo.Observable { return doRequest(i.V.(string)) }, rxgo.WithPool(32)).ForEach(out, func(e error) {}, func() {})rxgo.Just(urls)().Map(func(c context.Context, i interface{}) (interface{}, error) { val, err := <-doRequest(i.(string)).Observe() return val, err }, rxgo.WithPool(32)).ForEach(out, func(e error) {}, func() {})These two should do the same thing
These codes are not merged, the issue still exists