factory icon indicating copy to clipboard operation
factory copied to clipboard

关闭风险

Open buguang01 opened this issue 6 years ago • 8 comments

func newWorker() (w *worker) {
	w = &worker{
		params: make(chan interface{}),
	}
	go func(w *worker) {
		for {
			if w.process() {
				break
			}
			atomic.StoreInt32(&w.isBusy, 0)
		}
		// 置为繁忙状态
		atomic.StoreInt32(&w.isBusy, 1)
		// 可能存在任务
		select {
		case params := <-w.params:
			w.action(params)
		default:
		}
		// 关闭任务通道
		close(w.params)
	}(w)
	return
}

在退出的时候,你这边考虑了有可能有某个任务正在写入新的参数。但是在你的这段代码运行到select的时候,也有可能那边还没开始写入参数,导致你这边运行到了default然后就把通道关闭了,导致写入方出现异常。

buguang01 avatar Nov 01 '19 08:11 buguang01

还有,就算你在这里读到了任务参数,开始运行了,但是你在select中做的w.action(params),没有做异常保护。也可能导致出问题。

buguang01 avatar Nov 01 '19 08:11 buguang01

第一个问题,我已经先把work标记成busy了,不会再有新写入进入了吧 第二个问题周末修复一下,感谢您的反馈

letsfire avatar Nov 01 '19 09:11 letsfire

如果在 // 置为繁忙状态 atomic.StoreInt32(&w.isBusy, 1) 之前, if atomic.CompareAndSwapInt32(&w.isBusy, 0, 1) { 这个刚被执行呢?

buguang01 avatar Nov 01 '19 09:11 buguang01

我加了一个定时器解决您提的这个问题,至于action出错,我的设想是action自行处理的,line.go里面也有SetPanicHandler,worker.go里的process也只是防止worker死掉,出错了组件这里也是不知道怎么回补的,这些应该action自己处理吧

letsfire avatar Nov 18 '19 15:11 letsfire

func (w *worker) assign(action func(interface{}), params interface{}) bool {
	if atomic.CompareAndSwapInt32(&w.isBusy, 0, 1) {
		time.Sleep(time.Second)
		w.action = action
		w.params <- params
		return true
	}
	return false
}

如果你在这里加上一个sleep我说的这个BUG就很容易暴露了。 这是一个风险。

buguang01 avatar Nov 19 '19 02:11 buguang01

func TestMaster(t *testing.T) {
	wg := sync.WaitGroup{}
	wg2 := sync.WaitGroup{}
	w := factory.NewMaster(100, 2)
	ctx, cel := context.WithCancel(context.Background())
	li := w.AddLine("testli", func(e interface{}) {
		defer wg.Done()
		time.Sleep(time.Second * 1)
	})
	for i := 0; i < 100; i++ {
		go func() {
			wg2.Add(1)
			defer wg2.Done()
			for {
				select {
				case <-ctx.Done():
					return
				default:
					wg.Add(1)
					li.Submit(0)
				}
			}
		}()
	}
	time.Sleep(time.Second * 5)
	fmt.Println("adjustsize")
	for i := 0; i < 100; i++ {
		w.AdjustSize(100)
		time.Sleep(time.Second)
		w.AdjustSize(1)
		time.Sleep(time.Second)
	}
	fmt.Println("for end")
	cel()      //关闭发协程
	wg2.Wait() //确认发协程是否能关闭,多半会卡在这里
	fmt.Println("down")
	w.Shutdown()
	fmt.Println("Wait")
	wg.Wait()
	fmt.Println("End")
}

测试

buguang01 avatar Nov 19 '19 02:11 buguang01

你用isBusy的0,1表示是否在被占用,那关闭就是一直被占用,你可以在发关闭信号的时候,把这个改成2,然后写入那个值。

func (w *worker) shutdown() {
        for !atomic.CompareAndSwapInt32(&w.isBusy, 0, 2) {
	        
        }
        w.params <- exitSignal{}
       close(w.params)
}

然后在

for {
	if w.process() {
		break
	}
	atomic.CompareAndSwapInt32(&w.isBusy, 1,0)
}

其他地方你再改改

buguang01 avatar Nov 19 '19 05:11 buguang01

优化的好一点,其实还可以在shutdown方法里强改,或chan的缓存为2等等。

buguang01 avatar Nov 19 '19 05:11 buguang01