watermill icon indicating copy to clipboard operation
watermill copied to clipboard

Google Pub/Sub subscription is hanging if topic count is big

Open flamedmg opened this issue 5 years ago • 2 comments

after upgrading to watermill-googlecloud v1.0.5 production system stopped to accept new messages, however it was working just fine. I decided to clean all topics/subscriptions from google pubsub and then app was hanging after subscribing to 10 topic, on 11. I tried to debug and made some things which i do not remember and app still was hanging, but already subscribing more then 10 subscriptions ~30-40 (i have ~80) The issue was fixed when i allowed pubsub client to use pool of 100 connections

func NewGooglePubSub(cfg GCPSConfig, log logur.Logger, errHandler emperror.ErrorHandler) (message.Publisher, message.Subscriber, error) {
	logger := wlLog.NewWithErrorHandler(logur.WithField(log, "component", "watermill"), errHandler)
	if cfg.EmulatorHost != "" {
		if err := os.Setenv("PUBSUB_EMULATOR_HOST", cfg.EmulatorHost); err != nil {
			return nil, nil, err
		}
		log.Info("Google ~PUBSUB configured as emulator")
	}

	if cfg.Credentials != "" {
		if err := os.Setenv("GOOGLE_APPLICATION_CREDENTIALS", cfg.Credentials); err != nil {
			return nil, nil, err
		}
		log.Info("Google PubSub configured as production")
	}

	gcfg := googlecloud.SubscriberConfig{
		GenerateSubscriptionName:         googlecloud.TopicSubscriptionName,
		ProjectID:                        cfg.Project,
		DoNotCreateSubscriptionIfMissing: false,
		DoNotCreateTopicIfMissing:        false,
	This line ------>>>	ClientOptions:                    []option.ClientOption{option.WithGRPCConnectionPool(100)},
	}

	gcfg.SubscriptionConfig.AckDeadline = time.Minute * time.Duration(viper.GetInt("watermill.ackDeadline"))

	subscriber, err := googlecloud.NewSubscriber(
		gcfg,
		logger,
	)
	if err != nil {
		return nil, nil, err
	}

	publisher, err := googlecloud.NewPublisher(googlecloud.PublisherConfig{
		ProjectID: cfg.Project,
	}, logger)
	if err != nil {
		return nil, nil, err
	}

	return publisher, subscriber, nil
}

The change was in this line ClientOptions: []option.ClientOption{option.WithGRPCConnectionPool(100)}, After that router was able to proceed and subscribe to all subscriptions. Before this change as i understood pubsub client was using just 16 connections.

If each topic requires 1 live connection in the pool, can watermill google pubsub calculate that and and increase pool? Looking at the source, i can't tell how it can be done easily on lib level.

flamedmg avatar Aug 07 '20 10:08 flamedmg

Just ran into this. Unfortunately, changing client options didn't help for me.

Same with your findings. The 11th topic subscription hangs.

DEBU[0003] Subscribing to topic                          component=watermill count=11

hashbender avatar Aug 11 '20 04:08 hashbender

@flamedmg @nitronick600 https://github.com/ThreeDotsLabs/watermill-googlecloud/pull/8 should fix that. Can you verify if it's ok after this update?

Thanks!

roblaszczak avatar Aug 18 '20 18:08 roblaszczak