rabbot icon indicating copy to clipboard operation
rabbot copied to clipboard

RPC not working: What I am doing wrong? Is there an example of its implementation?

Open acarlstein opened this issue 6 years ago • 14 comments

Greetings,

Will you help me?

I apologize to bother but I am having troubles implementing RPC correctly.

Perhaps someone can point out what I am doing wrong in the code since I couldn't find in the documentation a full example on how to implemented.

// Publish RPC from this client to server
var extension = {
 id: UUID.random(),
}

var body = {...};

rabbit.publish('rpc.extension.request', {
    routingKey: 'extend.rpc',
    correlationId: extension.id,
    replyTo: "rpc.extension.response",
    contentType: "application/json",
    body: body,
    expiresAfter: 60000 // TTL in ms, in this example 60 seconds - can be removed after consumer working, just here to prevent junk accumulating on servers.
  }).then(
    () => { // a list of the messages of that succeeded
      log.info(`Success publishing extension for rpc ${extension.id}`);
    },
    (failed) => { // a list of failed messages and the errors `{ err, message }`
      log.info(`Error publishing extension for rpc ${extension.id} ->  ${failed.err}: ${failed.message}`);
    });

-----------------------
// Consuming RPC result sent by server to this client.

module.exports.setup = function () {
	...

	const handleRPC = rabbit.handle({queue: 'rpc.extension.response'}, handleExtendRPCResponse);
	handleRPC.catch(onHandleError);
};

function handleExtendRPCResponse(msg) {
	logger.info(`RPC Received message : ${msg}`)

	try {
		messageHandler.processMessage(msg, function (err) {
			if (err) {
				logger.error(`ERROR in RPC Process message: "${err.error}"`);
				msg.reject();
			} else {
				logger.info("Process RPC message passed");
				msg.ack();
			}
		})
	} catch(e){
		logger.error(`ERROR while Processing RPC message: "${e.message}"`);
		msg.reject();
	}
}

function onHandleError (err, msg) {
    // not able to hit this with a test...not sure how to.
    logger.error('Error:', JSON.stringify(err), 'Message:', JSON.stringify(msg));
    // Temporary, but if we can't handle it we don't want it requeued.
    msg.reject();
}

Thanks for your time and the creation of this package.

acarlstein avatar Feb 18 '19 20:02 acarlstein

Maybe I got your question wrong, but if you trying to send back a result from the rpc call, you could use rabbit.request instead of rabbit.publish.

We do it like this:

rabbit
  .request('exchange', {
    type: 'req',
    body: task,
    routingKey
  })
  /*.progress(function(reply) {
    // if multiple replies are provided, all but the last will be sent via the progress callback
  })*/

  .then((final) => {
    // the last message in a series OR the only reply will be sent to this callback

    final.ack();
    if (
      Array.isArray(task.params) &&
      typeof task.params[task.cbPos] === 'function'
    ) {
      task.params[task.cbPos].apply(this, final.body.args);
    } else {
      //...
    }
  });

manuel-reil avatar Feb 18 '19 20:02 manuel-reil

@manuel-reil, thanks for answering to my question.

Actually, I have a microservice that is trying to do an RPC request, via RabbitMQ, and then consume the result that comes back, via RabbitMQ.

Based on your feedback, I guess my RPC request was wrong because I was using publish() instead of request(). Am I correct?

Also, based on your feedback, I don't need to implement a consumer since request() is going to wait for the return of the result of the RPC request and put it into the chain .then((final) -> ...) Am I correct also here?

acarlstein avatar Feb 19 '19 15:02 acarlstein

@acarlstein you are correct. You want request() not publish() for rpc. Under the covers during connection a unique rpc queue is automatically made for you to act as the listener queue to handle the return messages and abstracted to the promise level for ease of use. The rpc request uses the replyTo header that is added during the request() call in order to send it back the the right service that made the rpc request originally.

zlintz avatar Feb 23 '19 17:02 zlintz

I figured out the issue. In the topology, when creating the exchange, I needed to add 'subscribe:true' property since the queue was already created but used as publish instead of request.

Thanks anyways for the feedback. I appreciate it.

acarlstein avatar Feb 25 '19 19:02 acarlstein

Ah yes, the old subscribe:true I have been there myself

zlintz avatar Feb 26 '19 17:02 zlintz

@zlintz,

I am having an issue with this as well. following your exact example above...

rabbit
  .request('myExchange', {
    type: 'req',
    body: task,
    routingKey: 'key'
  })
  .then((final) => {
    final.ack();
    log.info('received message');
  });

I am ale to see that my consumer (a java process ) is responding and that there is a message in the temporary queue, but 'received message' is never logged. When I turn on DEBUG=* I see messages like:

  rabbot.exchange debug +15s Publishing message ( type: 'req' topic: 'key', sequence: '0', correlation: '', replyTo: '{"type":"req","contentType":"application/json","
contentEncoding":"utf8","correlationId":"","replyTo":"xxx.response.queue","messageId":"087f6300-4751-11e9-98b1-f3c4b7b64947","timestamp":1552674897204,"a
ppId":"JSampson5530.npm.18772","headers":{},"mandatory":false,"persistent":true}' ) to direct exchange 'assignment.requests' on connection 'default'
  rabbot.acknack debug +365ms New pending tag 1 on queue xxx.response.queue - default
  rabbot.unhandled warn +1ms Message of xxx.response.queue on queue 'xxx.response.queue', connection 'default' was not processed by an
y registered handlers
  rabbot.acknack debug +3ms Marking tag 1 as nack'd on queue xxx.response.queue - default
  rabbot.acknack debug +83ms nack ALL (1) tags on xxx.response.queue up to 1 - default.
  rabbot.queue debug +1ms Nacking tag 1 on 'xxx.response.queue' - 'default'

The use of subscribe:true on the connection definition does not seem to have an effect Any Ideas?

jtsampson avatar Mar 15 '19 18:03 jtsampson

@jtsampson can you provide a sample of the message coming back from the java consumer which is replying on the replyTo queue including all the headers. There are specific expectations that rabbot has around the RPC message coming back on the replyTo queue in order to connect the return message back the correct request() promise response. Most of these expectations are handled for you if you have rabbot on both sides.

zlintz avatar Mar 21 '19 15:03 zlintz

@zlintz ,

Thanks for getting back to me!

[aside] For sake of this discussion, and if it matters, the REQUESTER is a node app originally on Rabbit 1.1.0, but now upgraded to Rabbot 2.1.0 , the RESPONDER is a Java app using ampq-client-3.3.5 We have this working asynchronously and want to add a synchronous functionality using the same (or slightly modified) RESPONDER code.

I was pursuing this line of debugging earlier this week to try and determine exactly what the form of the message should be comming back from my RESPONDER but I hadn't gotten around to finishing it up.

Here is what I did so far.

Using the example code and topology in request.spec.js, and dupmping the response object from line 99 to the console I see the following

response: { fields: 
   { consumerTag: 'MACHINE.NAME.PROCESS.NAME',
     deliveryTag: 2,
     redelivered: false,
     exchange: '',
     routingKey: 'MACHINE.NAME.PROCESS.NAME' },
  properties: 
   { contentType: 'text/plain',
     contentEncoding: 'utf8',
     headers: { sequence_end: true },
     deliveryMode: undefined,
     priority: undefined,
     correlationId: '41dd6c20-4c22-11e9-b0db-c5719578f79c',
     replyTo: 'MACHINE.NAME.PROCESS.NAME',
     expiration: undefined,
     messageId: undefined,
     timestamp: 1553204562918,
     type: 'polite.reply',
     userId: undefined,
     appId: undefined,
     clusterId: undefined },
  content: <Buffer 3a 44>,
  ack: { [Function: bound ack] [length]: 0, [name]: 'bound ack' },
  reject: { [Function: bound reject] [length]: 0, [name]: 'bound reject' },
  nack: { [Function: bound nack] [length]: 0, [name]: 'bound nack' },
  reply: 
   { [Function]
     [length]: 2,
     [name]: '',
     [arguments]: null,
     [caller]: null,
     [prototype]: { [constructor]: [Circular] } },
  type: 'polite.reply',
  queue: 'MACHINE.NAME.PROCESS.NAME',
  body: ':D' }

I am guessing that my Java consumer has to respond with similar headers and properties but I am not exactly sure which ones...I tried to set up the minum number of these in my RESPONDER code:

      channel = rabbitMqManager.createChannel();

      Map<String,Object> headers = new HashMap<>();
      headers.put("sequence_end", true); //  possibly expected by node.js rabbot library

      AMQP.BasicProperties props =
          new AMQP.BasicProperties.Builder()
                  .appId("CONSUME_APP") // this is the java consume app name not the rabbot publisher app name
                  .contentEncoding("UTF-8")
                  .contentType("application/json")
                  .correlationId(correlationId) // correlation id sent by publisher from rabbit properties
                  .type((String)properties.getType()) //  possibly expected by node.js rabbot library
                  .headers(headers).build();

      // reply to in this case is 'MACHINE.NAME.PROCESS.NAME'
      channel.basicPublish("", replyTo, props, (body.toJSONString().getBytes()));
      // thisgoes to the default exchange

Above I am assumung (possibly wrongly) that I do not need to have the RESPONDER reply with any of the properties that are undefined from the dump and that the appId does not matter. Then my message will look something like this, I haven't trapped the actuall message yet, but am faily confident it looks like...

{
	"headers": {
		"sequence_end": true
	},
	"properties": {
		"contentEncoding": "UTF-8",
		"contentType": "application/json",
		"corelationId": "passed from PUBLISHER",
		"type": "type passed from PUBLISHER"
	},
	"body": "body"
}

Hopefully I am on the right track.

Do you know exactly which properties and headers I need?

jtsampson avatar Mar 21 '19 22:03 jtsampson

@jtsampson

I did some testing and digging. In order for the response to make it back to the request() the consumer must take the properties.message_id from the message and then give that back as the properties.correlation_id on the return message on the reply to queue. The headers should also have "sequence_end": true

For example the message sent to be replied to on the queue... image

And the response would be like... image

I can explain more of the details if needed, let me know if this helps.

zlintz avatar Mar 23 '19 06:03 zlintz

@zlintz,

Thanks for the run down, I will try this in my code, should take about a day or so to try out.. If that works I will definately be back for details, and I can certainly add a pull request for added documentation.

jtsampson avatar Mar 25 '19 15:03 jtsampson

@jtsampson any update? @acarlstein does this address your scenario as well?

zlintz avatar Mar 28 '19 23:03 zlintz

@zlintz,

Sorry for the late reply. Yes, this does solve my issue perfectly. Additionally, PM'ed @acarlstein and this was actually the solution to his problem as well. You can close this issue.

jtsampson avatar Mar 29 '19 15:03 jtsampson

Solution was also posted here: https://github.com/arobson/rabbot/issues/76

jtsampson avatar Mar 29 '19 16:03 jtsampson

That makes sense why this problem seemed so familiar. I helped @fifthfloorsoftware with this back then

zlintz avatar Mar 29 '19 23:03 zlintz