mapreduce
mapreduce copied to clipboard
`mapreduce.Finish`中嵌套使用`mapreduce.MapReduce`会导致`Finish`变成非阻塞操作
嵌套的MapReduce的reducer中如果不调用writer.Write方法,会产生一个ErrReduceNoOutput错误,Finish 中 worker 返回异常会直接结束 Finish 调用,但是Finish中调用的MapReduceVoid会吞掉ErrReduceNoOutput错误返回一个 nil,从最后结果看是没有异常的成功调用,实际其他的 worker 都还在异步运行
例如下面这样的调用:
func main(){
err := mapreduce.Finish(func() error {
return worker1()
}, func() error {
val, err := mapreduce.MapReduce(func(source chan<- interface{}) {
for i := 0;i<10;i++{
source <- i
}
}, func(item interface{}, writer mapreduce.Writer, cancel func(error)) {
i := item.(int)
writer.Write(i * i)
}, func(pipe <-chan interface{}, writer mapreduce.Writer, cancel func(error)) {
var cnt int
for i := range pipe{
cnt += i.(int)
}
// 这里不调用Write 会导致当前这个 worker 任务返回一个异常
// writer.Write(cnt)
})
// 收到一个异常 `ErrReduceNoOutput`
if err != nil {
return err
}
fmt.Println("result:", val)
})
// 这里的 err 是 nil
if err != nil {
fmt.Println(err)
}
}
If any err, Finish function quits.
Because like we're doing some IO requests, if any of them failed, others should quit, but if they're blocked in IO, the only way to do so is just quit Finish.