EventBus
EventBus copied to clipboard
Indexing error due to multiple removeHandler
In line 141 func Publish
// Publish executes callback defined for a topic. Any additional argument will be transferred to the callback.
func (bus *EventBus) Publish(topic string, args ...interface{}) {
bus.lock.Lock() // will unlock if handler is not found or always after setUpPublish
defer bus.lock.Unlock()
if handlers, ok := bus.handlers[topic]; ok && 0 < len(handlers) {
// Handlers slice may be changed by removeHandler and Unsubscribe during iteration,
// so make a copy and iterate the copied slice.
copyHandlers := make([]*eventHandler, len(handlers))
copy(copyHandlers, handlers)
for i, handler := range copyHandlers {
if handler.flagOnce {
bus.removeHandler(topic, i) // multiple operation causes indexing error
}
if !handler.async {
bus.doPublish(handler, topic, args...)
} else {
bus.wg.Add(1)
if handler.transactional {
bus.lock.Unlock()
handler.Lock()
bus.lock.Lock()
}
go bus.doPublishAsync(handler, topic, args...)
}
}
}
}
i
and handler
are ranged in copyHandlers
while remove operation actions in bus.handlers
. 🤨
In a test case of many subscribe and many subscribeOnce, bus.removeHandler tries to remove a handler with index out of range.
Add out-of-range outputs in event_bus.go
:
func (bus *EventBus) removeHandler(topic string, idx int) {
if _, ok := bus.handlers[topic]; !ok {
return
}
l := len(bus.handlers[topic])
if !(0 <= idx && idx < l) {
fmt.Println("out of range")
return
}
copy(bus.handlers[topic][idx:], bus.handlers[topic][idx+1:])
bus.handlers[topic][l-1] = nil // or the zero value of T
bus.handlers[topic] = bus.handlers[topic][:l-1]
}
Test case in event_bus_test.go
:
func TestManySubscribeAndManySubscribe(t *testing.T) {
bus := New()
event := "topic"
flag := 0
fn := func() { flag += 1 }
bus.SubscribeOnce(event, fn)
bus.SubscribeOnce(event, fn)
bus.Subscribe(event, fn)
bus.Subscribe(event, fn)
bus.SubscribeOnce(event, fn)
bus.SubscribeOnce(event, fn)
bus.Publish(event)
if flag != 6 {
t.Fail()
}
}
Test result:
=== RUN TestManySubscribeAndManySubscribe
remove out of range
remove out of range
--- PASS: TestManySubscribeAndManySubscribe (0.00s)
PASS
ok eventbus/v1 (cached)
Would it be better to use offset
to record the offset of index between handlers
and copyHandlers
.🙄
Maybe like this:
func (bus *EventBus) Publish(topic string, args ...interface{}) {
bus.lock.Lock()
defer bus.lock.Unlock()
if topicHandlers, ok := bus.handlers[topic]; ok && len(topicHandlers) > 0 {
copiedHandlers := make([]*eventHandler, len(topicHandlers))
copy(copiedHandlers, topicHandlers)
offset := 0
for index, handler := range copiedHandlers {
if handler.flagOnce {
bus.removeHandler(topic, index+offset)
offset--
}
if !handler.async {
bus.doPublish(handler, topic, args...)
} else {
bus.wg.Add(1)
if handler.transactional {
bus.lock.Unlock()
handler.lock.Lock()
bus.lock.Lock()
}
go bus.doPublishAsync(handler, topic, args...)
}
}
}
}
This project seems to not be maintained any more. I'm just looking around the issues here. Maybe there's a maintained fork...