watermill icon indicating copy to clipboard operation
watermill copied to clipboard

[watermill-amqp] What is the mechanism of exception handling?

Open duolabmeng6 opened this issue 5 years ago • 6 comments

If my rabbmit restarts

How to reestablish channel

[watermill] 2020/03/13 08:59:33.510953 subscriber.go:166: level=INFO msg="Starting consuming from AMQP channel" amqp_exchange_name= amqp_queue_name=listen_result_b111ad19-e4c0-4730-805c-3a44b5b7727f topic=listen_result_b111ad19-e4c0-4730-805c-3a44b5b7727f

duolabmeng6 avatar Mar 13 '20 01:03 duolabmeng6

Do you have more details? Do you mean that this flow of reconnecting is not executing properly: https://github.com/ThreeDotsLabs/watermill-amqp/blob/master/pkg/amqp/subscriber.go#L70 ?

roblaszczak avatar Mar 13 '20 19:03 roblaszczak

I simulated the rabbmitmq outage

Stopped rabbmitmq

But I don't know how to use your package to capture disconnected yics

I want to catch the exception

Reestablishing channels

duolabmeng6 avatar Mar 14 '20 09:03 duolabmeng6

How should I add

Listen for rabbmitmq disconnect event

duolabmeng6 avatar Mar 14 '20 09:03 duolabmeng6

I checked all the documents

There's still no way to write it down

Sorry to bother you.

duolabmeng6 avatar Mar 14 '20 09:03 duolabmeng6

Hi, I have the same problem.

I think that when the connection is lost and reconnection is done the message channel stops receiving them.

For example, in a loop like the one in the example, messages stop being received.

func process(messages <-chan *message.Message) {
	for msg := range messages {
		log.Printf("received message: %s, payload: %s", msg.UUID, string(msg.Payload))

		// we need to Acknowledge that we received and processed the message,
		// otherwise, it will be resent over and over again.
		msg.Ack()
	}
}

I haven't had much time to look at the reconnect implementation but it's probably something related to the code you said (https://github.com/ThreeDotsLabs/watermill-amqp/blob/master/pkg/amqp/subscriber.go#L70)

Any ideas?

Thanks :D

sdeancos avatar Aug 21 '20 16:08 sdeancos

Hello @sdeancos @duolabmeng6 I checked locally and with tests (https://github.com/ThreeDotsLabs/watermill-amqp/blob/master/pkg/amqp/pubsub_reconnect_test.go) and it seems that reconnect is working for me properly. Can you provide a bit more details to reproduce it? Thanks!

This is the code that I'm using

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill-amqp/pkg/amqp"
	"github.com/ThreeDotsLabs/watermill/message"
)

func createPubSub() (message.Publisher, message.Subscriber) {
	publisher, err := amqp.NewPublisher(
		amqp.NewDurablePubSubConfig(
			"amqp://guest:guest@localhost:5672",
			nil,
		),
		watermill.NewStdLogger(true, true),
	)
	if err != nil {
		panic(err)
	}

	subscriber, err := amqp.NewSubscriber(
		amqp.NewDurablePubSubConfig(
			"amqp://guest:guest@localhost:5672",
			amqp.GenerateQueueNameTopicNameWithSuffix("test"),
		),
		watermill.NewStdLogger(true, true),
	)
	if err != nil {
		panic(err)
	}

	return publisher, subscriber
}

func main() {
	pub, sub := createPubSub()

	go func() {
		for {
			pub.Publish("foo", message.NewMessage(time.Now().String(), nil))
			time.Sleep(time.Second)
		}
	}()

	msgs, err := sub.Subscribe(context.Background(), "foo")
	if err != nil {
		panic(err)
	}

	for msg := range msgs {
		fmt.Println(msg.UUID)
		msg.Ack()
	}
}

and docker-compose:

version: '3'
services:
  rabbitmq:
    image: rabbitmq:3.7-management
    restart: unless-stopped
    ports:
      - 5672:5672

I'm running the Go code and I'm restarting docker to force re-connect.

roblaszczak avatar Jan 02 '21 09:01 roblaszczak