mapreduce icon indicating copy to clipboard operation
mapreduce copied to clipboard

`mapreduce.Finish`中嵌套使用`mapreduce.MapReduce`会导致`Finish`变成非阻塞操作

Open lujin123 opened this issue 3 years ago • 1 comments

嵌套的MapReducereducer中如果不调用writer.Write方法,会产生一个ErrReduceNoOutput错误,Finish 中 worker 返回异常会直接结束 Finish 调用,但是Finish中调用的MapReduceVoid会吞掉ErrReduceNoOutput错误返回一个 nil,从最后结果看是没有异常的成功调用,实际其他的 worker 都还在异步运行

例如下面这样的调用:

func main(){
        err := mapreduce.Finish(func() error {
		return worker1()
	}, func() error {
		val, err := mapreduce.MapReduce(func(source chan<- interface{}) {
			for i := 0;i<10;i++{
				source <- i
			}
		}, func(item interface{}, writer mapreduce.Writer, cancel func(error)) {
			i := item.(int)
			writer.Write(i * i)
		}, func(pipe <-chan interface{}, writer mapreduce.Writer, cancel func(error)) {
			var cnt int
			for i := range pipe{
				cnt += i.(int)
			}
                         // 这里不调用Write 会导致当前这个 worker 任务返回一个异常
			// writer.Write(cnt) 
		})
                 // 收到一个异常 `ErrReduceNoOutput`
		if err != nil {
			return err
		}
		fmt.Println("result:", val)
	})
       // 这里的 err 是 nil
       if err != nil {
           fmt.Println(err)
      }
}

lujin123 avatar Nov 03 '22 07:11 lujin123

If any err, Finish function quits.

Because like we're doing some IO requests, if any of them failed, others should quit, but if they're blocked in IO, the only way to do so is just quit Finish.

kevwan avatar Jan 12 '23 14:01 kevwan