EventBus icon indicating copy to clipboard operation
EventBus copied to clipboard

Indexing error due to multiple removeHandler

Open Jjjpan opened this issue 2 years ago • 3 comments

In line 141 func Publish

// Publish executes callback defined for a topic. Any additional argument will be transferred to the callback.
func (bus *EventBus) Publish(topic string, args ...interface{}) {
	bus.lock.Lock() // will unlock if handler is not found or always after setUpPublish
	defer bus.lock.Unlock()
	if handlers, ok := bus.handlers[topic]; ok && 0 < len(handlers) {
		// Handlers slice may be changed by removeHandler and Unsubscribe during iteration,
		// so make a copy and iterate the copied slice.
		copyHandlers := make([]*eventHandler, len(handlers))
		copy(copyHandlers, handlers)
		for i, handler := range copyHandlers {
			if handler.flagOnce {
				bus.removeHandler(topic, i) // multiple operation causes indexing error
			}
			if !handler.async {
				bus.doPublish(handler, topic, args...)
			} else {
				bus.wg.Add(1)
				if handler.transactional {
					bus.lock.Unlock()
					handler.Lock()
					bus.lock.Lock()
				}
				go bus.doPublishAsync(handler, topic, args...)
			}
		}
	}
}

i and handler are ranged in copyHandlers while remove operation actions in bus.handlers. 🤨

Jjjpan avatar Jun 17 '22 06:06 Jjjpan

In a test case of many subscribe and many subscribeOnce, bus.removeHandler tries to remove a handler with index out of range.

Add out-of-range outputs in event_bus.go:

func (bus *EventBus) removeHandler(topic string, idx int) {
	if _, ok := bus.handlers[topic]; !ok {
		return
	}
	l := len(bus.handlers[topic])

	if !(0 <= idx && idx < l) {
                fmt.Println("out of range")
		return
	}

	copy(bus.handlers[topic][idx:], bus.handlers[topic][idx+1:])
	bus.handlers[topic][l-1] = nil // or the zero value of T
	bus.handlers[topic] = bus.handlers[topic][:l-1]
}

Test case in event_bus_test.go:

func TestManySubscribeAndManySubscribe(t *testing.T) {
	bus := New()
	event := "topic"
	flag := 0
	fn := func() { flag += 1 }
	bus.SubscribeOnce(event, fn)
	bus.SubscribeOnce(event, fn)
	bus.Subscribe(event, fn)
	bus.Subscribe(event, fn)
	bus.SubscribeOnce(event, fn)
	bus.SubscribeOnce(event, fn)
	bus.Publish(event)

	if flag != 6 {
		t.Fail()
	}
}

Test result:

=== RUN   TestManySubscribeAndManySubscribe
remove out of range
remove out of range
--- PASS: TestManySubscribeAndManySubscribe (0.00s)
PASS
ok      eventbus/v1     (cached)

Jjjpan avatar Jun 17 '22 07:06 Jjjpan

Would it be better to use offset to record the offset of index between handlers and copyHandlers.🙄

Maybe like this:


func (bus *EventBus) Publish(topic string, args ...interface{}) {
	bus.lock.Lock()
	defer bus.lock.Unlock()

	if topicHandlers, ok := bus.handlers[topic]; ok && len(topicHandlers) > 0 {
		copiedHandlers := make([]*eventHandler, len(topicHandlers))
		copy(copiedHandlers, topicHandlers)
		offset := 0
		for index, handler := range copiedHandlers {
			if handler.flagOnce {
				bus.removeHandler(topic, index+offset)
				offset--
			}
			if !handler.async {
				bus.doPublish(handler, topic, args...)
			} else {
				bus.wg.Add(1)
				if handler.transactional {
					bus.lock.Unlock()
					handler.lock.Lock()
					bus.lock.Lock()
				}
				go bus.doPublishAsync(handler, topic, args...)
			}
		}
	}
}

Jjjpan avatar Jun 17 '22 07:06 Jjjpan

This project seems to not be maintained any more. I'm just looking around the issues here. Maybe there's a maintained fork...

varfrog avatar Aug 09 '22 19:08 varfrog