amqp icon indicating copy to clipboard operation
amqp copied to clipboard

concurrent publish occur: "Exception (504) Reason: "channel/connection is not open"

Open littlejiancc opened this issue 4 years ago • 4 comments

go version: go1.14.1 windows/amd64

rabbitmq version: RabbitMQ 3.8.4 Erlang 23.0

my code is:

package main

import (
	"fmt"

	"github.com/streadway/amqp"
)

var (
	conn = &amqp.Connection{}
	ch   = &amqp.Channel{}
)

func InitMQ() {
	var err error
	conn, err = amqp.Dial("amqp://guest:guest@localhost:5672/")
	if err != nil {
		panic(err)
	}
	ch, err = conn.Channel()
	if err != nil {
		panic(err)
	}
}

func main() {
	InitMQ()
	for i := 0; i < 100; i++ {
		go func(i int) {
			err := PublishNews([]byte{byte(i)})
			if err != nil {
				fmt.Println(err)
			}
		}(i)
	}
	select {}
}

func PublishNews(data []byte) (err error) {
	err = ch.ExchangeDeclare(
		"exchange1",
		"fanout",
		true,
		false,
		false,
		false,
		nil,
	)
	if err != nil {
		return fmt.Errorf("ExchangeDeclare: %s", err)
	}
	q, err := ch.QueueDeclare(
		"queue",
		true,
		false,
		false,
		false,
		nil,
	)
	if err != nil {
		return fmt.Errorf("QueueDeclare: %s", err)
	}
	err = ch.QueueBind(
		q.Name,
		"",
		"exchange1",
		false,
		nil,
	)
	if err != nil {
		return fmt.Errorf("QueueBind: %s", err)
	}
	err = ch.Publish(
		"exchange1",
		"",
		false,
		false,
		amqp.Publishing{
			ContentType:  "text/plain",
			Body:         data,
			DeliveryMode: 2, // durable
		})
	if err != nil {
		return fmt.Errorf("Publish:  %s", err)
	}
	return nil
}

the error print is :

Publish:  Exception (504) Reason: "channel/connection is not open"
Publish:  Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
Publish:  Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueDeclare: Exception (504) Reason: "channel/connection is not open"
QueueDeclare: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueDeclare: Exception (504) Reason: "channel/connection is not open"
QueueDeclare: Exception (504) Reason: "channel/connection is not open"
QueueDeclare: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueDeclare: Exception (504) Reason: "channel/connection is not open"
QueueBind: write tcp [::1]:64696->[::1]:5672: use of closed network connection
QueueDeclare: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: write tcp [::1]:64696->[::1]:5672: use of closed network connection
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: write tcp [::1]:64696->[::1]:5672: use of closed network connection
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueDeclare: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: write tcp [::1]:64696->[::1]:5672: use of closed network connection
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: write tcp [::1]:64696->[::1]:5672: use of closed network connection
QueueDeclare: Exception (504) Reason: "channel/connection is not open"
QueueDeclare: Exception (504) Reason: "channel/connection is not open"
QueueDeclare: Exception (504) Reason: "channel/connection is not open"
QueueDeclare: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: write tcp [::1]:64696->[::1]:5672: use of closed network connection
QueueDeclare: Exception (504) Reason: "channel/connection is not open"
QueueBind: Exception (504) Reason: "channel/connection is not open"
QueueBind: write tcp [::1]:64696->[::1]:5672: use of closed network connection
QueueBind: write tcp [::1]:64696->[::1]:5672: use of closed network connection
QueueBind: write tcp [::1]:64696->[::1]:5672: use of closed network connection
QueueBind: write tcp [::1]:64696->[::1]:5672: use of closed network connection
QueueDeclare: Exception (505) Reason: "UNEXPECTED_FRAME - expected content header for class 60, got non content header frame instead"
QueueBind: write tcp [::1]:64696->[::1]:5672: use of closed network connection

the rabbitmq log is :

2020-08-20 15:38:44.204 [info] <0.11426.16> accepting AMQP connection <0.11426.16> ([::1]:64696 -> [::1]:5672)
2020-08-20 15:38:44.207 [info] <0.11426.16> connection <0.11426.16> ([::1]:64696 -> [::1]:5672): user 'guest' authenticated and granted access to vhost '/'
2020-08-20 15:38:44.211 [error] <0.11426.16> Error on AMQP connection <0.11426.16> ([::1]:64696 -> [::1]:5672, vhost: '/', user: 'guest', state: running), channel 1:
 operation basic.publish caused a connection exception unexpected_frame: "expected content header for class 60, got non content header frame instead"
2020-08-20 15:38:44.213 [info] <0.11426.16> closing AMQP connection <0.11426.16> ([::1]:64696 -> [::1]:5672, vhost: '/', user: 'guest')
2020-08-20 15:39:39.725 [error] <0.731.0> Could not find handle.exe, please install from sysinternals

littlejiancc avatar Aug 20 '20 07:08 littlejiancc

Even though declaring exchanges, queues and binding them should be idempotent I think you're setting yourself up for trouble by performing these commands in 100 go routines at the same time. It could happen just so that you send multiple declarations of the same exchange f.ex. at the same time which I think could lead to issues on the server side.

I tried your code as is and it works most of the time but sometimes I see the same errors as you do. When I got errors it was resolved by just restarting the RabbitMQ server.

However, by moving the declaration of the exchange, the queue and binding them to InitMQ() I can no longer reproduce this issue (newly started or already running RabbitMQ server)

MRE

docker run --rm -it -p 5672:5672 rabbitmq:3

package main

import (
	"fmt"
	"sync"

	"github.com/streadway/amqp"
)

func InitMQ() *amqp.Channel {
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	if err != nil {
		panic(err)
	}

	ch, err := conn.Channel()
	if err != nil {
		panic(err)
	}

	err = ch.ExchangeDeclare("exchange1", "fanout", true, false, false, false, nil)
	if err != nil {
		panic(err)
	}

	q, err := ch.QueueDeclare("queue", true, false, false, false, nil)
	if err != nil {
		panic(err)
	}

	err = ch.QueueBind(q.Name, "", "exchange1", false, nil)
	if err != nil {
		panic(err)
	}

	return ch
}

func main() {
	ch := InitMQ()

	wg := sync.WaitGroup{}

	for i := 1; i <= 100; i++ {
		fmt.Printf("spawning go routine %d/%d\n", i, 100)
		wg.Add(1)

		go func(i int) {
			err := PublishNews(ch, []byte{byte(i)})
			if err != nil {
				fmt.Println(err)
			}

			wg.Done()
		}(i)
	}

	wg.Wait()
}

func PublishNews(ch *amqp.Channel, data []byte) (err error) {
	return ch.Publish(
		"exchange1", "", false, false,
		amqp.Publishing{
			ContentType:  "text/plain",
			Body:         data,
			DeliveryMode: 2, // durable
		})
}

bombsimon avatar Sep 01 '20 20:09 bombsimon

@bombsimon thx for your response

littlejiancc avatar Oct 13 '20 07:10 littlejiancc

@wxj95 The problem that I see here is that you are sharing the same publishing channel between the goroutines. This is wrong and it might lead to incorrect frame interleaving as @michaelklishin mentioned here https://groups.google.com/g/rabbitmq-users/c/u4AZ2t9enu0/m/WkDIU9ENAgAJ

Please check this issue as well for more info https://github.com/streadway/amqp/issues/283

The solution is to create a new channel in each goroutine.

boncea avatar Jan 27 '21 15:01 boncea

Channel#Publish takes a lock in the beginning, so I do not understand how publishing with two goroutines is different from publishing with only one goroutine.

apyshkin avatar Oct 18 '21 09:10 apyshkin