rabbot
rabbot copied to clipboard
RPC not working: What I am doing wrong? Is there an example of its implementation?
Greetings,
Will you help me?
I apologize to bother but I am having troubles implementing RPC correctly.
Perhaps someone can point out what I am doing wrong in the code since I couldn't find in the documentation a full example on how to implemented.
// Publish RPC from this client to server
var extension = {
id: UUID.random(),
}
var body = {...};
rabbit.publish('rpc.extension.request', {
routingKey: 'extend.rpc',
correlationId: extension.id,
replyTo: "rpc.extension.response",
contentType: "application/json",
body: body,
expiresAfter: 60000 // TTL in ms, in this example 60 seconds - can be removed after consumer working, just here to prevent junk accumulating on servers.
}).then(
() => { // a list of the messages of that succeeded
log.info(`Success publishing extension for rpc ${extension.id}`);
},
(failed) => { // a list of failed messages and the errors `{ err, message }`
log.info(`Error publishing extension for rpc ${extension.id} -> ${failed.err}: ${failed.message}`);
});
-----------------------
// Consuming RPC result sent by server to this client.
module.exports.setup = function () {
...
const handleRPC = rabbit.handle({queue: 'rpc.extension.response'}, handleExtendRPCResponse);
handleRPC.catch(onHandleError);
};
function handleExtendRPCResponse(msg) {
logger.info(`RPC Received message : ${msg}`)
try {
messageHandler.processMessage(msg, function (err) {
if (err) {
logger.error(`ERROR in RPC Process message: "${err.error}"`);
msg.reject();
} else {
logger.info("Process RPC message passed");
msg.ack();
}
})
} catch(e){
logger.error(`ERROR while Processing RPC message: "${e.message}"`);
msg.reject();
}
}
function onHandleError (err, msg) {
// not able to hit this with a test...not sure how to.
logger.error('Error:', JSON.stringify(err), 'Message:', JSON.stringify(msg));
// Temporary, but if we can't handle it we don't want it requeued.
msg.reject();
}
Thanks for your time and the creation of this package.
Maybe I got your question wrong, but if you trying to send back a result from the rpc call, you could use rabbit.request
instead of rabbit.publish
.
We do it like this:
rabbit
.request('exchange', {
type: 'req',
body: task,
routingKey
})
/*.progress(function(reply) {
// if multiple replies are provided, all but the last will be sent via the progress callback
})*/
.then((final) => {
// the last message in a series OR the only reply will be sent to this callback
final.ack();
if (
Array.isArray(task.params) &&
typeof task.params[task.cbPos] === 'function'
) {
task.params[task.cbPos].apply(this, final.body.args);
} else {
//...
}
});
@manuel-reil, thanks for answering to my question.
Actually, I have a microservice that is trying to do an RPC request, via RabbitMQ, and then consume the result that comes back, via RabbitMQ.
Based on your feedback, I guess my RPC request was wrong because I was using publish() instead of request(). Am I correct?
Also, based on your feedback, I don't need to implement a consumer since request() is going to wait for the return of the result of the RPC request and put it into the chain .then((final) -> ...) Am I correct also here?
@acarlstein you are correct. You want request() not publish() for rpc. Under the covers during connection a unique rpc queue is automatically made for you to act as the listener queue to handle the return messages and abstracted to the promise level for ease of use. The rpc request uses the replyTo header that is added during the request() call in order to send it back the the right service that made the rpc request originally.
I figured out the issue. In the topology, when creating the exchange, I needed to add 'subscribe:true' property since the queue was already created but used as publish instead of request.
Thanks anyways for the feedback. I appreciate it.
Ah yes, the old subscribe:true
I have been there myself
@zlintz,
I am having an issue with this as well. following your exact example above...
rabbit
.request('myExchange', {
type: 'req',
body: task,
routingKey: 'key'
})
.then((final) => {
final.ack();
log.info('received message');
});
I am ale to see that my consumer (a java process ) is responding and that there is a message in the temporary queue, but 'received message' is never logged. When I turn on DEBUG=*
I see messages like:
rabbot.exchange debug +15s Publishing message ( type: 'req' topic: 'key', sequence: '0', correlation: '', replyTo: '{"type":"req","contentType":"application/json","
contentEncoding":"utf8","correlationId":"","replyTo":"xxx.response.queue","messageId":"087f6300-4751-11e9-98b1-f3c4b7b64947","timestamp":1552674897204,"a
ppId":"JSampson5530.npm.18772","headers":{},"mandatory":false,"persistent":true}' ) to direct exchange 'assignment.requests' on connection 'default'
rabbot.acknack debug +365ms New pending tag 1 on queue xxx.response.queue - default
rabbot.unhandled warn +1ms Message of xxx.response.queue on queue 'xxx.response.queue', connection 'default' was not processed by an
y registered handlers
rabbot.acknack debug +3ms Marking tag 1 as nack'd on queue xxx.response.queue - default
rabbot.acknack debug +83ms nack ALL (1) tags on xxx.response.queue up to 1 - default.
rabbot.queue debug +1ms Nacking tag 1 on 'xxx.response.queue' - 'default'
The use of subscribe:true
on the connection definition does not seem to have an effect Any Ideas?
@jtsampson can you provide a sample of the message coming back from the java consumer which is replying on the replyTo
queue including all the headers. There are specific expectations that rabbot
has around the RPC message coming back on the replyTo queue in order to connect the return message back the correct request() promise response. Most of these expectations are handled for you if you have rabbot
on both sides.
@zlintz ,
Thanks for getting back to me!
[aside] For sake of this discussion, and if it matters, the REQUESTER is a node app originally on Rabbit 1.1.0
, but now upgraded to Rabbot 2.1.0
, the RESPONDER is a Java app using ampq-client-3.3.5
We have this working asynchronously and want to add a synchronous functionality using the same (or slightly modified) RESPONDER code.
I was pursuing this line of debugging earlier this week to try and determine exactly what the form of the message should be comming back from my RESPONDER but I hadn't gotten around to finishing it up.
Here is what I did so far.
Using the example code and topology in request.spec.js, and dupmping the response
object from line 99 to the console I see the following
response: { fields:
{ consumerTag: 'MACHINE.NAME.PROCESS.NAME',
deliveryTag: 2,
redelivered: false,
exchange: '',
routingKey: 'MACHINE.NAME.PROCESS.NAME' },
properties:
{ contentType: 'text/plain',
contentEncoding: 'utf8',
headers: { sequence_end: true },
deliveryMode: undefined,
priority: undefined,
correlationId: '41dd6c20-4c22-11e9-b0db-c5719578f79c',
replyTo: 'MACHINE.NAME.PROCESS.NAME',
expiration: undefined,
messageId: undefined,
timestamp: 1553204562918,
type: 'polite.reply',
userId: undefined,
appId: undefined,
clusterId: undefined },
content: <Buffer 3a 44>,
ack: { [Function: bound ack] [length]: 0, [name]: 'bound ack' },
reject: { [Function: bound reject] [length]: 0, [name]: 'bound reject' },
nack: { [Function: bound nack] [length]: 0, [name]: 'bound nack' },
reply:
{ [Function]
[length]: 2,
[name]: '',
[arguments]: null,
[caller]: null,
[prototype]: { [constructor]: [Circular] } },
type: 'polite.reply',
queue: 'MACHINE.NAME.PROCESS.NAME',
body: ':D' }
I am guessing that my Java consumer has to respond with similar headers and properties but I am not exactly sure which ones...I tried to set up the minum number of these in my RESPONDER code:
channel = rabbitMqManager.createChannel();
Map<String,Object> headers = new HashMap<>();
headers.put("sequence_end", true); // possibly expected by node.js rabbot library
AMQP.BasicProperties props =
new AMQP.BasicProperties.Builder()
.appId("CONSUME_APP") // this is the java consume app name not the rabbot publisher app name
.contentEncoding("UTF-8")
.contentType("application/json")
.correlationId(correlationId) // correlation id sent by publisher from rabbit properties
.type((String)properties.getType()) // possibly expected by node.js rabbot library
.headers(headers).build();
// reply to in this case is 'MACHINE.NAME.PROCESS.NAME'
channel.basicPublish("", replyTo, props, (body.toJSONString().getBytes()));
// thisgoes to the default exchange
Above I am assumung (possibly wrongly) that I do not need to have the RESPONDER reply with any of the properties that are undefined
from the dump and that the appId
does not matter. Then my message will look something like this, I haven't trapped the actuall message yet, but am faily confident it looks like...
{
"headers": {
"sequence_end": true
},
"properties": {
"contentEncoding": "UTF-8",
"contentType": "application/json",
"corelationId": "passed from PUBLISHER",
"type": "type passed from PUBLISHER"
},
"body": "body"
}
Hopefully I am on the right track.
Do you know exactly which properties and headers I need?
@jtsampson
I did some testing and digging. In order for the response to make it back to the request()
the consumer must take the properties.message_id
from the message and then give that back as the properties.correlation_id
on the return message on the reply to queue. The headers should also have "sequence_end": true
For example the message sent to be replied to on the queue...
And the response would be like...
I can explain more of the details if needed, let me know if this helps.
@zlintz,
Thanks for the run down, I will try this in my code, should take about a day or so to try out.. If that works I will definately be back for details, and I can certainly add a pull request for added documentation.
@jtsampson any update? @acarlstein does this address your scenario as well?
@zlintz,
Sorry for the late reply. Yes, this does solve my issue perfectly. Additionally, PM'ed @acarlstein and this was actually the solution to his problem as well. You can close this issue.
Solution was also posted here: https://github.com/arobson/rabbot/issues/76
That makes sense why this problem seemed so familiar. I helped @fifthfloorsoftware with this back then