watermill
watermill copied to clipboard
[watermill-amqp] What is the mechanism of exception handling?
If my rabbmit restarts
How to reestablish channel
[watermill] 2020/03/13 08:59:33.510953 subscriber.go:166: level=INFO msg="Starting consuming from AMQP channel" amqp_exchange_name= amqp_queue_name=listen_result_b111ad19-e4c0-4730-805c-3a44b5b7727f topic=listen_result_b111ad19-e4c0-4730-805c-3a44b5b7727f
Do you have more details? Do you mean that this flow of reconnecting is not executing properly: https://github.com/ThreeDotsLabs/watermill-amqp/blob/master/pkg/amqp/subscriber.go#L70 ?
I simulated the rabbmitmq outage
Stopped rabbmitmq
But I don't know how to use your package to capture disconnected yics
I want to catch the exception
Reestablishing channels
How should I add
Listen for rabbmitmq disconnect event
I checked all the documents
There's still no way to write it down
Sorry to bother you.
Hi, I have the same problem.
I think that when the connection is lost and reconnection is done the message channel stops receiving them.
For example, in a loop like the one in the example, messages stop being received.
func process(messages <-chan *message.Message) {
for msg := range messages {
log.Printf("received message: %s, payload: %s", msg.UUID, string(msg.Payload))
// we need to Acknowledge that we received and processed the message,
// otherwise, it will be resent over and over again.
msg.Ack()
}
}
I haven't had much time to look at the reconnect implementation but it's probably something related to the code you said (https://github.com/ThreeDotsLabs/watermill-amqp/blob/master/pkg/amqp/subscriber.go#L70)
Any ideas?
Thanks :D
Hello @sdeancos @duolabmeng6 I checked locally and with tests (https://github.com/ThreeDotsLabs/watermill-amqp/blob/master/pkg/amqp/pubsub_reconnect_test.go) and it seems that reconnect is working for me properly. Can you provide a bit more details to reproduce it? Thanks!
This is the code that I'm using
package main
import (
"context"
"fmt"
"time"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-amqp/pkg/amqp"
"github.com/ThreeDotsLabs/watermill/message"
)
func createPubSub() (message.Publisher, message.Subscriber) {
publisher, err := amqp.NewPublisher(
amqp.NewDurablePubSubConfig(
"amqp://guest:guest@localhost:5672",
nil,
),
watermill.NewStdLogger(true, true),
)
if err != nil {
panic(err)
}
subscriber, err := amqp.NewSubscriber(
amqp.NewDurablePubSubConfig(
"amqp://guest:guest@localhost:5672",
amqp.GenerateQueueNameTopicNameWithSuffix("test"),
),
watermill.NewStdLogger(true, true),
)
if err != nil {
panic(err)
}
return publisher, subscriber
}
func main() {
pub, sub := createPubSub()
go func() {
for {
pub.Publish("foo", message.NewMessage(time.Now().String(), nil))
time.Sleep(time.Second)
}
}()
msgs, err := sub.Subscribe(context.Background(), "foo")
if err != nil {
panic(err)
}
for msg := range msgs {
fmt.Println(msg.UUID)
msg.Ack()
}
}
and docker-compose:
version: '3'
services:
rabbitmq:
image: rabbitmq:3.7-management
restart: unless-stopped
ports:
- 5672:5672
I'm running the Go code and I'm restarting docker to force re-connect.