qamqp icon indicating copy to clipboard operation
qamqp copied to clipboard

Trying to publish a message inside the 'connected' signal fails on reconnect.

Open alaendle opened this issue 8 years ago • 9 comments

If I create a simple sender that just publishes a message on 'connected' it fails during re-connecting. Just take a look at the following sample (a small modification of the sender-example):

Sender::Sender(QObject *parent) : QObject(parent) {
    m_client.setAutoReconnect(true);
}

void Sender::start() {
        connect(&m_client, SIGNAL(connected()), this, SLOT(clientConnected()));
        m_client.connectToHost();
}

void Sender::clientConnected() {
    QAmqpExchange *defaultExchange = m_client.createExchange("test");
    defaultExchange->publish("Hello World!", "test");
    qDebug() << " [x] Sent 'Hello World!'";
}

If I now restart the broker the client tries to publish before a channel is established:

... trying to reconnect after: 5000 ms connecting to host: "localhost" , port: 5672 -> connection#start( version_major=0, version_minor=9, mechanisms=(PLAIN,AMQPLAIN), locales=en_US <- connection#startOk() -> connection#tune( channel_max=0, frame_max=131072, heartbeat=60 ) <- connection#tuneOk( channelMax=0, frameMax=131072, heartbeatDelay=60 <- connection#open( virtualHost=/, reserved-1=0, reserved-2=0 -> connection#openOk() <- basic#publish( exchange=test, routing-key=test, mandatory=0, immediate=0 ) [x] Sent 'Hello World!' <- channel#open( channel=1 ) -> connection#close( reply-code=504, reply-text=CHANNEL_ERROR - expected 'channel.open', class-id=60, method-id:40 ) exchange disconnected: "test" <- connection#closeOk() socket error: "The remote host closed the connection" exchange disconnected: "test" trying to reconnect after: 1000 ms connecting to host: "localhost" , port: 5672 -> connection#start( version_major=0, version_minor=9, mechanisms=(PLAIN,AMQPLAIN), locales=en_US <- connection#startOk() -> connection#tune( channel_max=0, frame_max=131072, heartbeat=60 ) <- connection#tuneOk( channelMax=0, frameMax=131072, heartbeatDelay=60 <- connection#open( virtualHost=/, reserved-1=0, reserved-2=0 -> connection#openOk() <- basic#publish( exchange=test, routing-key=test, mandatory=0, immediate=0 ) [x] Sent 'Hello World!' <- channel#open( channel=1 ) -> connection#close( reply-code=504, reply-text=CHANNEL_ERROR - expected 'channel.open', class-id=60, method-id:40 ) exchange disconnected: "test" <- connection#closeOk() socket error: "The remote host closed the connection" exchange disconnected: "test" trying to reconnect after: 1000 ms ...

For me it seems like this is a bug, but maybe I'm just making wrong assumptions about the API of qampq. If I was unclear or if you need more information just let me know. Thanks in advance for any assistance or any hint how to work around this problem.

alaendle avatar Mar 22 '16 09:03 alaendle

@alaendle sorry, was on vacation! hmm this does indeed look like a bug, I'll try to repro on my end here and see what's going on.

mbroadst avatar Mar 27 '16 15:03 mbroadst

@alaendle can you post your whole code here? It looks like you're never declaring the exchange?

mbroadst avatar Mar 28 '16 12:03 mbroadst

@alaendle ah, I think I understand your issue. Yeah it looks like there is currently a problem if you try publishing when not properly connected. I wonder if maybe you'd be interested in helping out with this feature? The exchange would have to maintain an internal list of pending messages to send, which would be published upon connection and the channel being established.

mbroadst avatar Mar 28 '16 12:03 mbroadst

@alaendle so I took a first stab at supporting pending sends on this branch. The work is incomplete because of other issues I uncovered with certain reconnect scenarios, but the code there does indeed resend the data on reconnect, solving your problem. I won't have time for this for a few days, but I'll try to get around to completely fixing the issue.

mbroadst avatar Mar 28 '16 15:03 mbroadst

@mbroadst please excuse the late response, was also on vocation. Will take a look at your work now...

alaendle avatar Apr 05 '16 06:04 alaendle

I still have a problem in my original scenario. If I just shutdown and restart rabbitmq (v3.6.1) I couldn't publish messages after reconnecting... the opened flag never switches back to 'true', because channel#openOk is never received. The new log...

-> connection#start( version_major=0, version_minor=9, mechanisms=(AMQPLAIN,PLAIN), locales=en_US ) <- connection#startOk() -> connection#tune( channel_max=0, frame_max=131072, heartbeat=60 ) <- connection#tuneOk( channelMax=0, frameMax=131072, heartbeatDelay=60 ) <- connection#open( virtualHost=/, reserved-1=0, reserved-2=0 ) -> connection#openOk() channel name: "" <- channel#open( channel=1 ) [x] Sent 'Hello World!' -> channel#openOk( channel=1 ) <- basic#publish( exchange=, routing-key=test, mandatory=0, immediate=0 ) <- basic#publish( exchange=, routing-key=test, mandatory=0, immediate=0 ) [x] Sent 'Hello World!' socket error: "The remote host closed the connection" exchange disconnected: "" trying to reconnect after: 1000 ms connecting to host: "localhost" , port: 5672 socket error: "Connection refused" trying to reconnect after: 5000 ms connecting to host: "localhost" , port: 5672 -> connection#start( version_major=0, version_minor=9, mechanisms=(AMQPLAIN,PLAIN), locales=en_US ) <- connection#startOk() -> connection#tune( channel_max=0, frame_max=131072, heartbeat=60 ) <- connection#tuneOk( channelMax=0, frameMax=131072, heartbeatDelay=60 ) <- connection#open( virtualHost=/, reserved-1=0, reserved-2=0 ) -> connection#openOk() [x] Sent 'Hello World!' channel name: "" <- channel#open( channel=2 ) [x] Sent 'Hello World!' [x] Sent 'Hello World!' AMQP: Heartbeat

alaendle avatar Apr 05 '16 08:04 alaendle

The problem seems to have its cause in the fact that during reconnect the channel numbers change - so the maps pointing to the frame handler methods (in this case QAmqpClientPrivate::methodHandlersByChannel) are no longer correct. So two solutions come to my mind - a) don't change channel numbers, b) don't rely on channel numbers while looking for registered handlers. I did a short proof-of-concept implementation of b) and it seems to work - but for sure this isn't a fully fledged solution to the problem...

diff --git "a/C:\\Users\\laa6bh\\AppData\\Local\\Temp\\TortoiseGit\\qam37F6.tmp\\qamqpclient-7d9f803-left.cpp" "b/C:\\Tools\\qamqp\\src\\qamqpclient.cpp"
index e460e05..27c7415 100644
--- "a/C:\\Users\\laa6bh\\AppData\\Local\\Temp\\TortoiseGit\\qam37F6.tmp\\qamqpclient-7d9f803-left.cpp"
+++ "b/C:\\Tools\\qamqp\\src\\qamqpclient.cpp"
@@ -258,8 +258,13 @@ void QAmqpClientPrivate::_q_readyRead()
             if (frame.methodClass() == QAmqpFrame::Connection) {
                 _q_method(frame);
             } else {
-                foreach (QAmqpMethodFrameHandler *methodHandler, methodHandlersByChannel[frame.channel()])
-                    methodHandler->_q_method(frame);
+                QHashIterator<QAmqpChannel*, QList<QAmqpMethodFrameHandler*>> i(methodHandlersByChannel);
+                while (i.hasNext()) {
+                    i.next();
+                    if(i.key()->channelNumber() == frame.channel())
+                        foreach (QAmqpMethodFrameHandler *methodHandler, i.value())
+                            methodHandler->_q_method(frame);
+                }
             }
         }
             break;
@@ -689,7 +694,7 @@ QAmqpExchange *QAmqpClient::createExchange(const QString &name, int channelNumbe
     }

     exchange = new QAmqpExchange(channelNumber, this);
-    d->methodHandlersByChannel[exchange->channelNumber()].append(exchange->d_func());
+    d->methodHandlersByChannel[exchange].append(exchange->d_func());
     connect(this, SIGNAL(connected()), exchange, SLOT(_q_open()));
     connect(this, SIGNAL(disconnected()), exchange, SLOT(_q_disconnected()));
     exchange->d_func()->open();
@@ -716,7 +721,7 @@ QAmqpQueue *QAmqpClient::createQueue(const QString &name, int channelNumber)
     }

     queue = new QAmqpQueue(channelNumber, this);
-    d->methodHandlersByChannel[queue->channelNumber()].append(queue->d_func());
+    d->methodHandlersByChannel[queue].append(queue->d_func());
     d->contentHandlerByChannel[queue->channelNumber()].append(queue->d_func());
     d->bodyHandlersByChannel[queue->channelNumber()].append(queue->d_func());
     connect(this, SIGNAL(connected()), queue, SLOT(_q_open()));

alaendle avatar Apr 05 '16 12:04 alaendle

@alaendle are you using the code in that branch or the code in master? The issue initially with the code in master is that AmqpExchange tries to send data without concern for whether its currently open or not, so the work in that branch was focused on caching attempted sends until the exchange is open again (there are a number of caveats with this approach which would have to be tested, first and foremost the fact that this can totally leak memory if the exchange is never connected again!).

As for methodHandlersByChannel.. The channel numbers should be the same, so that would be a problem with the reconnect code in the client itself. Apologies for the slow nature of the fix here, but I haven't actually used this code in about three years! I'm interested in helping you fix the issue, but let's take it one step at a time if you'll accommodate me.

mbroadst avatar Apr 05 '16 13:04 mbroadst

@mbroadst Don't want to stress you, for sure we could/should improve the behaviour step-by-step. I just played around with the code in the 'pending-sends' branch and thought it might help to update this issue with the 'new' behaviour I've experienced. Also I'm aware about what it takes to maintain an open-source project over years - so let me take this chance, to just thank you for sharing the code and your support.

alaendle avatar Apr 06 '16 05:04 alaendle