amqpcpp icon indicating copy to clipboard operation
amqpcpp copied to clipboard

Reply-to example

Open dqueffeulouatw opened this issue 8 years ago • 0 comments

Hi,

can you post an example of a Publish with reply-to in "amq.rabbitmq.reply-to" ?

I tried to make it work but with no real success. First I tried to Publish and then make a Get from amq.rabbitmq.reply-to but I got the error "fast reply consumer does not exist". Then I tried to make two threads, one is launched first to Consume on the "amq.rabbitmq.reply-to" queue and the other to Publish a message with reply-to set to "amq.rabbitmq.reply-to". Actually sometimes it works but often the createExchange don't return or throw an Exception. The error is then : operation none caused a connection exception frame_error: "Malformed UTF-8 in shortstr". So I'm wondering if I can use the AMPQ instance from two threads. Here the code I'm using:

#include <vcl.h>
#include <time.h>
#include <windows.h>
#include <process.h>
#include "AMQPcpp.h"

std::string getUUID()
{
    UUID uuid;
    UuidCreate ( &uuid );
    unsigned char * str;
    UuidToStringA ( &uuid, &str );
    std::string uuid_s( ( char* ) str );
    RpcStringFreeA ( &str );
    return uuid_s;
}
std::string gCorrelationId;
int  onMessage( AMQPMessage * aMessage )
{
    uint32_t oLength = 0;
    std::cout << "Result: "<< aMessage->getMessage(&oLength) << "\n";
    std::string oCorrelationId = aMessage->getHeader("correlation_id");
    std::cout<<"    with CorrelationId: "<< oCorrelationId <<"\n";
    if (oCorrelationId == gCorrelationId) 
    {
        std::cout<<"    Reply read\n";
        AMQPQueue *queue = aMessage->getQueue();
        queue->Cancel( aMessage->getConsumerTag() );
    }
    return 0;
}

AMQP amqp("localhost");

unsigned __stdcall consumer(void *params)
{
    AMQPQueue * reply_queue = amqp.createQueue("amq.rabbitmq.reply-to");
    reply_queue->addEvent(AMQP_MESSAGE, onMessage );
    reply_queue->setConsumerTag("reply-to");
    std::cout <<"Waiting for response in "<<reply_queue->getName()<<"\n";
    reply_queue->Consume(AMQP_NOACK);
    std::cout<<"exit consumer\n";
    _endthreadex( 0 );
    return 0;
}

int main(int argc, char* argv[])
{
    try {
        HANDLE hThread;
        unsigned threadID;

        hThread = (HANDLE)_beginthreadex( NULL, 0, &consumer, NULL, 0, &threadID );
        std::cout<<"Thread handle: "<<hThread<<"\n";

        Sleep(1000);

        std::string oMessage = "30";    // ask fibonacci for this number
        std::string correlationId = getUUID();
        gCorrelationId = correlationId;

        // request exchange
        std::cout<<"Create exchange\n";
        AMQPExchange * ex = amqp.createExchange();

        std::cout<<"Publish\n";
        ex->setHeader("Content-type", "text/plain");
        ex->setHeader("Content-encoding", "UTF-8");
        ex->setHeader("Reply-to", "amq.rabbitmq.reply-to");
        ex->setHeader("correlation_id", correlationId);
        ex->Publish( oMessage , "rpc_queue");

        std::cout <<"Publish "<<oMessage<<" with correlation-id: "<<correlationId<<"\n";

        WaitForSingleObject( hThread, INFINITE );
    } 
    catch (AMQPException e) {
        std::cout << e.getMessage() << std::endl;
    }
    std::cout << "Fin Client AMQP" << std::endl;   
    return 0;
}```

dqueffeulouatw avatar Aug 31 '16 12:08 dqueffeulouatw