amqpcpp
amqpcpp copied to clipboard
Reply-to example
Hi,
can you post an example of a Publish with reply-to in "amq.rabbitmq.reply-to" ?
I tried to make it work but with no real success. First I tried to Publish and then make a Get from amq.rabbitmq.reply-to but I got the error "fast reply consumer does not exist". Then I tried to make two threads, one is launched first to Consume on the "amq.rabbitmq.reply-to" queue and the other to Publish a message with reply-to set to "amq.rabbitmq.reply-to". Actually sometimes it works but often the createExchange don't return or throw an Exception. The error is then : operation none caused a connection exception frame_error: "Malformed UTF-8 in shortstr". So I'm wondering if I can use the AMPQ instance from two threads. Here the code I'm using:
#include <vcl.h>
#include <time.h>
#include <windows.h>
#include <process.h>
#include "AMQPcpp.h"
std::string getUUID()
{
UUID uuid;
UuidCreate ( &uuid );
unsigned char * str;
UuidToStringA ( &uuid, &str );
std::string uuid_s( ( char* ) str );
RpcStringFreeA ( &str );
return uuid_s;
}
std::string gCorrelationId;
int onMessage( AMQPMessage * aMessage )
{
uint32_t oLength = 0;
std::cout << "Result: "<< aMessage->getMessage(&oLength) << "\n";
std::string oCorrelationId = aMessage->getHeader("correlation_id");
std::cout<<" with CorrelationId: "<< oCorrelationId <<"\n";
if (oCorrelationId == gCorrelationId)
{
std::cout<<" Reply read\n";
AMQPQueue *queue = aMessage->getQueue();
queue->Cancel( aMessage->getConsumerTag() );
}
return 0;
}
AMQP amqp("localhost");
unsigned __stdcall consumer(void *params)
{
AMQPQueue * reply_queue = amqp.createQueue("amq.rabbitmq.reply-to");
reply_queue->addEvent(AMQP_MESSAGE, onMessage );
reply_queue->setConsumerTag("reply-to");
std::cout <<"Waiting for response in "<<reply_queue->getName()<<"\n";
reply_queue->Consume(AMQP_NOACK);
std::cout<<"exit consumer\n";
_endthreadex( 0 );
return 0;
}
int main(int argc, char* argv[])
{
try {
HANDLE hThread;
unsigned threadID;
hThread = (HANDLE)_beginthreadex( NULL, 0, &consumer, NULL, 0, &threadID );
std::cout<<"Thread handle: "<<hThread<<"\n";
Sleep(1000);
std::string oMessage = "30"; // ask fibonacci for this number
std::string correlationId = getUUID();
gCorrelationId = correlationId;
// request exchange
std::cout<<"Create exchange\n";
AMQPExchange * ex = amqp.createExchange();
std::cout<<"Publish\n";
ex->setHeader("Content-type", "text/plain");
ex->setHeader("Content-encoding", "UTF-8");
ex->setHeader("Reply-to", "amq.rabbitmq.reply-to");
ex->setHeader("correlation_id", correlationId);
ex->Publish( oMessage , "rpc_queue");
std::cout <<"Publish "<<oMessage<<" with correlation-id: "<<correlationId<<"\n";
WaitForSingleObject( hThread, INFINITE );
}
catch (AMQPException e) {
std::cout << e.getMessage() << std::endl;
}
std::cout << "Fin Client AMQP" << std::endl;
return 0;
}```