amqp
amqp copied to clipboard
concurrent publish occur: "Exception (504) Reason: "channel/connection is not open"
go version: go1.14.1 windows/amd64
rabbitmq version: RabbitMQ 3.8.4 Erlang 23.0
my code is:
package main
import (
"fmt"
"github.com/streadway/amqp"
)
var (
conn = &amqp.Connection{}
ch = &amqp.Channel{}
)
func InitMQ() {
var err error
conn, err = amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
panic(err)
}
ch, err = conn.Channel()
if err != nil {
panic(err)
}
}
func main() {
InitMQ()
for i := 0; i < 100; i++ {
go func(i int) {
err := PublishNews([]byte{byte(i)})
if err != nil {
fmt.Println(err)
}
}(i)
}
select {}
}
func PublishNews(data []byte) (err error) {
err = ch.ExchangeDeclare(
"exchange1",
"fanout",
true,
false,
false,
false,
nil,
)
if err != nil {
return fmt.Errorf("ExchangeDeclare: %s", err)
}
q, err := ch.QueueDeclare(
"queue",
true,
false,
false,
false,
nil,
)
if err != nil {
return fmt.Errorf("QueueDeclare: %s", err)
}
err = ch.QueueBind(
q.Name,
"",
"exchange1",
false,
nil,
)
if err != nil {
return fmt.Errorf("QueueBind: %s", err)
}
err = ch.Publish(
"exchange1",
"",
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: data,
DeliveryMode: 2, // durable
})
if err != nil {
return fmt.Errorf("Publish: %s", err)
}
return nil
}
the error print is :
Publish: Exception (504) Reason: "channel/connection is not open"
Publish: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
Publish: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueDeclare: Exception (504) Reason: "channel/connection is not open"
QueueDeclare: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueDeclare: Exception (504) Reason: "channel/connection is not open"
QueueDeclare: Exception (504) Reason: "channel/connection is not open"
QueueDeclare: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueDeclare: Exception (504) Reason: "channel/connection is not open"
QueueBind: write tcp [::1]:64696->[::1]:5672: use of closed network connection
QueueDeclare: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: write tcp [::1]:64696->[::1]:5672: use of closed network connection
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: write tcp [::1]:64696->[::1]:5672: use of closed network connection
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueDeclare: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: write tcp [::1]:64696->[::1]:5672: use of closed network connection
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: write tcp [::1]:64696->[::1]:5672: use of closed network connection
QueueDeclare: Exception (504) Reason: "channel/connection is not open"
QueueDeclare: Exception (504) Reason: "channel/connection is not open"
QueueDeclare: Exception (504) Reason: "channel/connection is not open"
QueueDeclare: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: write tcp [::1]:64696->[::1]:5672: use of closed network connection
QueueDeclare: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: write tcp [::1]:64696->[::1]:5672: use of closed network connection
QueueBind: write tcp [::1]:64696->[::1]:5672: use of closed network connection
QueueBind: write tcp [::1]:64696->[::1]:5672: use of closed network connection
QueueBind: write tcp [::1]:64696->[::1]:5672: use of closed network connection
QueueDeclare: Exception (505) Reason: "UNEXPECTED_FRAME - expected content header for class 60, got non content header frame instead"
QueueBind: write tcp [::1]:64696->[::1]:5672: use of closed network connection
the rabbitmq log is :
2020-08-20 15:38:44.204 [info] <0.11426.16> accepting AMQP connection <0.11426.16> ([::1]:64696 -> [::1]:5672)
2020-08-20 15:38:44.207 [info] <0.11426.16> connection <0.11426.16> ([::1]:64696 -> [::1]:5672): user 'guest' authenticated and granted access to vhost '/'
2020-08-20 15:38:44.211 [error] <0.11426.16> Error on AMQP connection <0.11426.16> ([::1]:64696 -> [::1]:5672, vhost: '/', user: 'guest', state: running), channel 1:
operation basic.publish caused a connection exception unexpected_frame: "expected content header for class 60, got non content header frame instead"
2020-08-20 15:38:44.213 [info] <0.11426.16> closing AMQP connection <0.11426.16> ([::1]:64696 -> [::1]:5672, vhost: '/', user: 'guest')
2020-08-20 15:39:39.725 [error] <0.731.0> Could not find handle.exe, please install from sysinternals
Even though declaring exchanges, queues and binding them should be idempotent I think you're setting yourself up for trouble by performing these commands in 100 go routines at the same time. It could happen just so that you send multiple declarations of the same exchange f.ex. at the same time which I think could lead to issues on the server side.
I tried your code as is and it works most of the time but sometimes I see the same errors as you do. When I got errors it was resolved by just restarting the RabbitMQ server.
However, by moving the declaration of the exchange, the queue and binding them to InitMQ()
I can no longer reproduce this issue (newly started or already running RabbitMQ server)
MRE
docker run --rm -it -p 5672:5672 rabbitmq:3
package main
import (
"fmt"
"sync"
"github.com/streadway/amqp"
)
func InitMQ() *amqp.Channel {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
panic(err)
}
ch, err := conn.Channel()
if err != nil {
panic(err)
}
err = ch.ExchangeDeclare("exchange1", "fanout", true, false, false, false, nil)
if err != nil {
panic(err)
}
q, err := ch.QueueDeclare("queue", true, false, false, false, nil)
if err != nil {
panic(err)
}
err = ch.QueueBind(q.Name, "", "exchange1", false, nil)
if err != nil {
panic(err)
}
return ch
}
func main() {
ch := InitMQ()
wg := sync.WaitGroup{}
for i := 1; i <= 100; i++ {
fmt.Printf("spawning go routine %d/%d\n", i, 100)
wg.Add(1)
go func(i int) {
err := PublishNews(ch, []byte{byte(i)})
if err != nil {
fmt.Println(err)
}
wg.Done()
}(i)
}
wg.Wait()
}
func PublishNews(ch *amqp.Channel, data []byte) (err error) {
return ch.Publish(
"exchange1", "", false, false,
amqp.Publishing{
ContentType: "text/plain",
Body: data,
DeliveryMode: 2, // durable
})
}
@bombsimon thx for your response
@wxj95 The problem that I see here is that you are sharing the same publishing channel between the goroutines. This is wrong and it might lead to incorrect frame interleaving as @michaelklishin mentioned here https://groups.google.com/g/rabbitmq-users/c/u4AZ2t9enu0/m/WkDIU9ENAgAJ
Please check this issue as well for more info https://github.com/streadway/amqp/issues/283
The solution is to create a new channel in each goroutine.
Channel#Publish takes a lock in the beginning, so I do not understand how publishing with two goroutines is different from publishing with only one goroutine.