watermill icon indicating copy to clipboard operation
watermill copied to clipboard

channel: when the production is much larger than the consumption, it will cause the goroutine to grow infinitely?

Open zc2638 opened this issue 3 years ago • 2 comments

When the production is much larger than the consumption, it will cause the goroutine to grow infinitely?

func (g *GoChannel) sendMessage(topic string, message *message.Message) (<-chan struct{}, error) {
	subscribers := g.topicSubscribers(topic)
	ackedBySubscribers := make(chan struct{})

	logFields := watermill.LogFields{"message_uuid": message.UUID, "topic": topic}

	if len(subscribers) == 0 {
		close(ackedBySubscribers)
		g.logger.Info("No subscribers to send message", logFields)
		return ackedBySubscribers, nil
	}

	go func(subscribers []*subscriber) {
		for i := range subscribers {
			subscriber := subscribers[i]
			subscriber.sendMessageToSubscriber(message, logFields)
		}
		close(ackedBySubscribers)
	}(subscribers)

	return ackedBySubscribers, nil
}

zc2638 avatar Sep 07 '22 09:09 zc2638

@roblaszczak

zc2638 avatar Jan 09 '23 02:01 zc2638

@zc2638 can you give us some more details? The number of goroutines shouldn't grow, as it will wait until a message is acked by subscribers.

You can use the BlockPublishUntilSubscriberAck setting in the config to make publish blocking until all messages are acked.

m110 avatar Jan 24 '23 21:01 m110