timingwheel icon indicating copy to clipboard operation
timingwheel copied to clipboard

DelayQueue poll退出时需要清理wakeupC,不然offer会卡住吧

Open welllog opened this issue 4 years ago • 11 comments

Poll方法在 <-exitC时应该判断清理wakeupC吧

welllog avatar Dec 22 '20 09:12 welllog

在bucket的Flush函数中会首先移除该bucket中的timer,移除后再添加到新的bucket中或执行,此时在还未加到新bucket时如果并发的对某个timer stop,因为timer的bucket为nil,导致并没有停止

welllog avatar Dec 28 '20 08:12 welllog

func (t *Timer) Stop() 是一直循环删除,直到删除成功的呀

yunlong0928 avatar Jan 21 '21 13:01 yunlong0928

Stop循环条件是timer的bucket != nil, 但是flush中的for循环会先把timer的bucket置为nil, 此时timer stop由于bucket为nil不会执行。flush后面的reinsert再把timer插入 ,不就没有停止么。

welllog avatar Jan 22 '21 02:01 welllog

明白你意思了。

第一个问题“poll退出时需要清理wakeupC,不然offer会卡住”,我觉得是不是可以给时间轮加一个status,如果时间轮stop了,再调用AfterFunc等方法就直接返回error。或者就在Poll退出时,close掉wakeupC,然后在Offer的时候加个判断。

第二个问题,“bucket flush时并发timer.stop”,我觉得flush的时候不把timer的bucket置为nil也没影响,或者手动调两次timer.stop?

yunlong0928 avatar Jan 24 '21 08:01 yunlong0928

第一个问题,退出时也需要切换sleeping值的状态就可以了吧,跟加个status的状态差不错,只不过status明确告知错误比无意义的添加要好些。 第二个问题,flush时不设为nil不太好处理。Stop方法的循环中本身就是调用的Remove方法,除非Stop方法中的移除才将timer的bucket置为空。但仍然不行,假设并发时,flush中链表遍历完后再Stop,再执行flush中的reinsert将该timer插入一个新的bukcet,stop仍然无效。最好是加个字段表明timer停止了,但这就变成惰性删除了,一些原有设计就没必要了。 惰性删除版本可以见https://github.com/welllog/timewheel

welllog avatar Jan 28 '21 05:01 welllog

是的,你说的有道理。 我还想请教个问题,bucket的Flush方法里,作者写了个 TODO: Improve the coordination with b.Add() 你有留意这个问题吗?我提交了PR,作者可能还没看到,能否帮我康康 https://github.com/RussellLuo/timingwheel/pull/35

yunlong0928 avatar Jan 29 '21 07:01 yunlong0928

不清楚这个TODO是否指 并发时,add设置桶的过期时间后,又被flush中设为-1覆盖的情况 你PR里面写的curTime的下一个tick的timer,定位到该bucket。如果是advanceClock调整currentTime先于add获取currentTime执行,则add应该不会定位到该bucket吧。如果add获取到旧的currentTime,advanceClock再调整时间,应该才会定位到该bucket,add操作中的设置bucket过期时间跟flush中将过期时间设为-1无法保证哪个先操作,flush遍历链表过后add再实际插入并设置过期时间,flush中的设为-1再执行 就会出现你说的问题。将flush中的设置过期时间放入锁中,flush先于add执行没问题,在add中实际添加过后,再执行flush,但add中的设置过期时间这一操作仍然跟flush中的设置过期时间没有互斥关系,仍然不能保证谁先执行吧 不知道我推理得对不

welllog avatar Jan 29 '21 08:01 welllog

add中实际添加到bucket后,再执行flush中的遍历已经能够获取到之前的添加timer了,此时哪个设置过期时间不重要了。上面说错了。

welllog avatar Jan 29 '21 09:01 welllog

@welllog 谢谢反馈!

  1. DelayQueue poll退出时需要清理wakeupC,不然offer会卡住吧
  2. 在bucket的Flush函数中会首先移除该bucket中的timer,移除后再添加到新的bucket中或执行,此时在还未加到新bucket时如果并发的对某个timer stop,因为timer的bucket为nil,导致并没有停止

你提到的两个问题确实存在。

第二个问题:

  • 是由 #28 引入的,这个 PR 为了解决死锁问题,打破了 “b.remove() + b.Add()(由 reinsert 间接调用)” 的原子性;@yunlong0928 提到的 TODO 注释,也是在这个 PR 中加的。
  • #28 之前的 flush 版本,是参考 Kafka 的 flush 实现。Go 版本会出现死锁,是因为 Java 的 synchronized 是可重入的,而 Go 的 sync.Mutex 不支持可重入。

RussellLuo avatar Feb 08 '21 09:02 RussellLuo

@RussellLuo 上面提到的死锁问题,我有点不理解。reinsert的时候,不会出现sync.Mutex重入的情况呀,因为reinsert的时候就直接执行了

yunlong0928 avatar Feb 22 '21 14:02 yunlong0928

@yunlong0928 我修复了 bucket.Flush 函数,并加了一些注释说明:

https://github.com/RussellLuo/timingwheel/blob/54845bda31084d50d98d83afb01bd89d48154668/bucket.go#L123-L127

在bucket的Flush函数中会首先移除该bucket中的timer,移除后再添加到新的bucket中或执行,此时在还未加到新bucket时如果并发的对某个timer stop,因为timer的bucket为nil,导致并没有停止

@welllog 修复以后,这个问题应该不存在了。

RussellLuo avatar Feb 18 '22 15:02 RussellLuo