factory icon indicating copy to clipboard operation
factory copied to clipboard

数组越界BUG

Open buguang01 opened this issue 6 years ago • 14 comments

func (m *Master) AdjustSize(newSize int) {
	if int64(newSize) > m.maxNum {
		newSize = int(m.maxNum)
	}

	m.Lock()
	defer m.Unlock()

	if diff := newSize - int(m.ingNum); diff > 0 {
		for i := 0; i < diff; i++ {
			m.workers[m.ingNum] = newWorker()
			atomic.AddInt64(&m.ingNum, 1)
		}
	} else if diff < 0 {
		atomic.StoreInt64(&m.ingNum, int64(newSize))
		if cursor := atomic.LoadInt64(&m.cursor); cursor > int64(newSize) {
			atomic.StoreInt64(&m.cursor, int64(newSize))
		}
		for _, w := range m.workers[newSize:] {
			if w == nil {
				break
			}
			w.shutdown()
		}
		m.workers = m.workers[0:newSize]
	}
}

func (m *Master) Running() int64 {
	return atomic.LoadInt64(&m.ingNum)
}

func (m *Master) Shutdown() {
	m.AdjustSize(0) // 关闭所有worker
}

func (m *Master) getWorker() *worker {
	atomic.CompareAndSwapInt64(&m.cursor, m.ingNum, 0)
	idx := atomic.AddInt64(&m.cursor, 1)
	w := m.workers[idx-1]
	return w
}

当cursor与原ingNum相等时,你要进行缩容 这个时候,如果是先把ingNum改成了新的值,再 atomic.CompareAndSwapInt64(&m.cursor, m.ingNum, 0) 运行这行时,就不会修改成功,然后你对cursor,进行了累加,导致越界,或有可能拿到一个nil对象。 还有就是你的代码里,对workers初始化的时候,用的是maxNum 但后面修改的时候,又用了ingNum做为他的长度。 我原以为你是要一开始声明一个确定长度的数组。但后面又换了,如果按你后面的逻辑 你一开始应该是 workers: make([]*worker, initNum,maxNum) 才对。

buguang01 avatar Nov 01 '19 08:11 buguang01

大佬,帮忙Review

letsfire avatar Nov 05 '19 13:11 letsfire

你写了一个比之前复杂的保证安全的逻辑,但仔细看,还是会有问题

if atomic.CompareAndSwapInt32(&m.tryFlag, 0, 1) {
		atomic.StoreInt64(&m.cursor, 0)
		atomic.StoreInt32(&m.tryFlag, 0)
		m.synCond.Broadcast()
	} else {
		m.synCond.L.Lock()
		m.synCond.Wait()
		m.synCond.L.Unlock()
	}

你不能保证你在m.synCond.Broadcast()时,所有其他协程都在m.synCond.Wait() 虽然你的逻辑也不能保证m.synCond.Broadcast()之前,只有一个协程进入if的then里,但如果只有一个进入了then而有2个进入了else其中一个到了Wait,还有一个在Lock()等待进入中 这个时候你的m.synCond.Broadcast()只能让之前在wait的出来,然后Lock()就进入了wait。 这个时候,如果没有新的协程去m.synCond.Broadcast(),那这个Wait就会一直wiat下去。

虽然Wait方法里面会在添加监听后,释放锁,让别的协程可以进来,但是依然存在着我上面说的那种情况发生的概率;

buguang01 avatar Nov 06 '19 03:11 buguang01

你把workers从数组改成了双map进行切换,但你写的并没有达到目标,只是map在获取的时候,可以检查而已,并不需要双map 如果使用双map那每次就应该只更新其中一个map,但那样的话就会导致资源的不释放问题,和你写这个东西的初衷就违背了。

buguang01 avatar Nov 06 '19 03:11 buguang01

还有一个问题,那就是你的双map是同一个map,导致一修改数量,就会出现 fatal error: concurrent map read and map write

buguang01 avatar Nov 06 '19 03:11 buguang01

func TestMaster(t *testing.T) {
	wg := sync.WaitGroup{}
	wg2 := sync.WaitGroup{}
	w := NewMaster(10, 2)
	ctx, cel := context.WithCancel(context.Background())
	li := w.AddLine("testli", func(e interface{}) {
		defer wg.Done()
	})
	for i := 0; i < 100; i++ {
		go func() {
			wg2.Add(1)
			defer wg2.Done()
			for {
				select {
				case <-ctx.Done():
					return
				default:
					wg.Add(1)
					li.Submit(0)
				}
			}
		}()
	}
	time.Sleep(time.Second * 5)
	fmt.Println("adjustsize")
	w.AdjustSize(6)
	time.Sleep(time.Second * 2)
	w.AdjustSize(1)
	time.Sleep(time.Second * 2)
	cel() //关闭发协程
	w.AdjustSize(2)
	time.Sleep(time.Second * 2)
	wg2.Wait() //确认发协程是否能关闭,多半会卡在这里
	fmt.Println("down")
	w.Shutdown()
	fmt.Println("Wait")
	wg.Wait()
	fmt.Println("End")
}

给你一段测试代码。 改到你能看到输出end就行

buguang01 avatar Nov 06 '19 04:11 buguang01

记的多跑几次,几个问题都会暴露出来的。

buguang01 avatar Nov 06 '19 04:11 buguang01

感谢,这段工作较忙,问题已经修复,使用了sync.Map,您的测试用例已通过。再次感谢

letsfire avatar Nov 18 '19 15:11 letsfire

你的代码需要优化。用了好多互斥锁。单线程,原子锁。

buguang01 avatar Nov 19 '19 02:11 buguang01

还请指点,resetGuard里面是多,但是这段代码执行少,复杂度可以忽略吧

letsfire avatar Nov 19 '19 04:11 letsfire

Master.resetGuard 这个是用来干什么用的?

buguang01 avatar Nov 19 '19 04:11 buguang01

确保只有一个协程能重置m.cursor

letsfire avatar Nov 19 '19 05:11 letsfire

确保只有一个协程能重置m.cursor

原因呢? 你里面本来就是一个原子操作了

buguang01 avatar Nov 19 '19 05:11 buguang01

还有就是getWorker为什么是递归?不应该是循环吗?

buguang01 avatar Nov 19 '19 05:11 buguang01

m.workers.Load(idx) 同一时间会有多个协程拿不到worker进而去下面重置cursor,这时候下面的guard就是仅让其中一个协程进行重置,其他等待重置完毕继续获取从头获取

letsfire avatar Nov 19 '19 06:11 letsfire