RxGo icon indicating copy to clipboard operation
RxGo copied to clipboard

FlatMap seems not parallelly WithPool or WithCPUPool

Open arlor opened this issue 4 years ago • 2 comments

FlatMap not run parallelly WithPool or WithCPUPool

See:

package main

import (
        "fmt"
        "time"

        "github.com/reactivex/rxgo/v2"
)

func main() {
        obs := rxgo.Just(1, 2, 3)().FlatMap(
                func(i rxgo.Item) rxgo.Observable {
                        time.Sleep(time.Second)
                        return rxgo.Just(i.V.(int)*10, i.V.(int)*100)()
                },
                rxgo.WithCPUPool(),
        )

        start := time.Now().Unix()
        for item := range obs.Observe() {
                fmt.Printf("item: %+v\n", item)
        }
        end := time.Now().Unix()
        fmt.Printf("cost: %d seconds\n", end-start)
}

Expect: cost 1 seconds

Actual: cost 3 seconds

arlor avatar Nov 25 '20 03:11 arlor

#279 Is this fix ready? This works as expected in the other implementations of reactivex I tried(RxJS, RxJava/Reactor).

	rxgo.Just(urls)().FlatMap(func(i rxgo.Item) rxgo.Observable {
		return doRequest(i.V.(string))
	}, rxgo.WithPool(32)).ForEach(out, func(e error) {}, func() {})
	rxgo.Just(urls)().Map(func(c context.Context, i interface{}) (interface{}, error) {
		val, err := <-doRequest(i.(string)).Observe()
		return val, err
	}, rxgo.WithPool(32)).ForEach(out, func(e error) {}, func() {})

These two should do the same thing

nitedani avatar Feb 21 '22 11:02 nitedani

#279 Is this fix ready? This works as expected in the other implementations of reactivex I tried(RxJS, RxJava/Reactor).

	rxgo.Just(urls)().FlatMap(func(i rxgo.Item) rxgo.Observable {
		return doRequest(i.V.(string))
	}, rxgo.WithPool(32)).ForEach(out, func(e error) {}, func() {})
	rxgo.Just(urls)().Map(func(c context.Context, i interface{}) (interface{}, error) {
		val, err := <-doRequest(i.(string)).Observe()
		return val, err
	}, rxgo.WithPool(32)).ForEach(out, func(e error) {}, func() {})

These two should do the same thing

These codes are not merged, the issue still exists

dalianzhu avatar Aug 31 '22 17:08 dalianzhu