watermill
watermill copied to clipboard
channel: when the production is much larger than the consumption, it will cause the goroutine to grow infinitely?
When the production is much larger than the consumption, it will cause the goroutine to grow infinitely?
func (g *GoChannel) sendMessage(topic string, message *message.Message) (<-chan struct{}, error) {
subscribers := g.topicSubscribers(topic)
ackedBySubscribers := make(chan struct{})
logFields := watermill.LogFields{"message_uuid": message.UUID, "topic": topic}
if len(subscribers) == 0 {
close(ackedBySubscribers)
g.logger.Info("No subscribers to send message", logFields)
return ackedBySubscribers, nil
}
go func(subscribers []*subscriber) {
for i := range subscribers {
subscriber := subscribers[i]
subscriber.sendMessageToSubscriber(message, logFields)
}
close(ackedBySubscribers)
}(subscribers)
return ackedBySubscribers, nil
}
@roblaszczak
@zc2638 can you give us some more details? The number of goroutines shouldn't grow, as it will wait until a message is acked by subscribers.
You can use the BlockPublishUntilSubscriberAck setting in the config to make publish blocking until all messages are acked.