amqplib
amqplib copied to clipboard
connection.close causes process to die and error is uncatchable
Hi there. This is in response to a reported error in servicebus, which uses amqplib (https://github.com/mateodelnorte/servicebus/issues/93).
Issue: Exiting RabbitMQ causes client code to exit without error. connection.close does fire, but subsequently throwing or emitting an error is somehow swallowed.
This behavior is showing up in servicebus, and it looks like it may be related to the promise library in amqplib. For both of the following examples (throwing uncaught error and emitting unhandled error), both applications will crash with no uncaught exception or unhandled rejection. I'd expect one of the two.
var amqp = require('amqplib');
var EventEmitter = require('events').EventEmitter;
amqp.connect('amqp://localhost').then(function (conn) {
conn.on('close', function (event) {
console.log('we get our event here', event)
if (require('amqplib/lib/connection').isFatalError(event.Error)){
console.log('and its fatal, but throwing seems to be swallowed - perhaps by the when.defer() promise?')
throw new Error(event)
console.log('something is catching this, eating it, and causing the process to die')
}
});
}).catch(function (err) {
console.log('no error here either', err);
}).then(null, function(err) {
console.log('no error here', err);
});
process.on('uncaughtException', function (exception) {
console.log('no uncaught exception', exception);
});
process.on('unhandledRejection', (reason, p) => {
console.log('no unhandled rejection at: Promise', p, 'reason:', reason);
});
class Wat extends EventEmitter {
constructor () {
super();
var self = this;
amqp.connect('amqp://localhost').then((conn) => {
conn.on('close', function (event) {
console.log('we get our event here', event)
if (require('amqplib/lib/connection').isFatalError(event.Error)){
console.log('and its fatal, but emitting error seems to be swallowed - perhaps by the when.defer() promise?')
self.emit('error', Error(event));
console.log('something is catching this, eating it, and causing the process to die')
}
});
}).catch((err) => {
console.log('no error here either', err);
}).then(null, (err) => {
console.log('no error here', err);
});
}
}
process.on('uncaughtException', function (exception) {
console.log('no uncaught exception', exception);
});
process.on('unhandledRejection', (reason, p) => {
console.log('no unhandled rejection at: Promise', p, 'reason:', reason);
});
new Wat();
Is there a particular way these errors need to be thrown/emitted/captured and re-throw/emitted?
Any ideas on this?
Hi @mateodelnorte, I haven't used the promise based api, but am wondering what happens if you listen for conn.on('error')
or process.on('error')
?
It catches the errors, for instance allowing me to log the error, but the process still dies immediately after.
https://github.com/mateodelnorte/servicebus/blob/master/bus/rabbitmq/bus.js#L50-L51
https://github.com/mateodelnorte/servicebus/issues/93
Are you sure that it's the same error event? Maybe the first one is being logged then swallowed, but a second error event being emitted from something other than the connection? Does process.on('error')
give any clues?
Scratch my previous two comments. Did some testing and they're wrong. This seems to be happening because nothing is handling socket events, after connect. Adding
sock.on('error', (err) => {
console.log('yup, socket error: ', err)
})
to line https://github.com/squaremo/amqp.node/blob/master/lib/connect.js#L185 will cause the following behavior:
$ node ./test/.amqplib.js
yup, socket error: { Error: read ECONNRESET
at exports._errnoException (util.js:1022:11)
at TCP.onread (net.js:569:26) code: 'ECONNRESET', errno: 'ECONNRESET', syscall: 'read' }
no uncaught exception { Error: read ECONNRESET
at exports._errnoException (util.js:1022:11)
at TCP.onread (net.js:569:26) code: 'ECONNRESET', errno: 'ECONNRESET', syscall: 'read' }
yup, socket error: Error: This socket is closed
at Socket._writeGeneric (net.js:680:19)
at Socket._write (net.js:731:8)
at doWrite (_stream_writable.js:334:12)
at writeOrBuffer (_stream_writable.js:320:5)
at Socket.Writable.write (_stream_writable.js:247:11)
at Socket.write (net.js:658:40)
at Connection.C.sendBytes (C:\development\ged\message-bus\node_modules\amqplib\lib\connection.js:517:15)
at Connection.C.sendHeartbeat (C:\development\ged\message-bus\node_modules\amqplib\lib\connection.js:521:15)
at Heart.<anonymous> (C:\development\ged\message-bus\node_modules\amqplib\lib\connection.js:431:12)
at emitNone (events.js:86:13)
yup, socket error: Error: This socket is closed
at Socket._writeGeneric (net.js:680:19)
at Socket._write (net.js:731:8)
at doWrite (_stream_writable.js:334:12)
at writeOrBuffer (_stream_writable.js:320:5)
at Socket.Writable.write (_stream_writable.js:247:11)
at Socket.write (net.js:658:40)
at Connection.C.sendBytes (C:\development\ged\message-bus\node_modules\amqplib\lib\connection.js:517:15)
at Connection.C.sendHeartbeat (C:\development\ged\message-bus\node_modules\amqplib\lib\connection.js:521:15)
at Heart.<anonymous> (C:\development\ged\message-bus\node_modules\amqplib\lib\connection.js:431:12)
at emitNone (events.js:86:13)
no uncaught exception Error: Heartbeat timeout
at Heart.<anonymous> (C:\development\ged\message-bus\node_modules\amqplib\lib\connection.js:425:19)
at emitNone (events.js:86:13)
at Heart.emit (events.js:185:7)
at Heart.runHeartbeat (C:\development\ged\message-bus\node_modules\amqplib\lib\heartbeat.js:88:17)
at ontimeout (timers.js:365:14)
at tryOnTimeout (timers.js:237:5)
at Timer.listOnTimeout (timers.js:207:5)
yup, socket error: Error: This socket is closed
at Socket._writeGeneric (net.js:680:19)
at Socket._write (net.js:731:8)
at doWrite (_stream_writable.js:334:12)
at writeOrBuffer (_stream_writable.js:320:5)
at Socket.Writable.write (_stream_writable.js:247:11)
at Socket.write (net.js:658:40)
at Connection.C.sendBytes (C:\development\ged\message-bus\node_modules\amqplib\lib\connection.js:517:15)
at Connection.C.sendHeartbeat (C:\development\ged\message-bus\node_modules\amqplib\lib\connection.js:521:15)
at Heart.<anonymous> (C:\development\ged\message-bus\node_modules\amqplib\lib\connection.js:431:12)
at emitNone (events.js:86:13)
So, I think this is a combination of 1) the socket error is not being handled and rethrown as something recoverable from within amqplib and 2) it's being passed to openCallback which ends up returning to a previously resolved promise, which makes bluebird in turn do a process.emit(err
- which I've never seen anyone do EVER. lol
Seems this issue is the same as https://github.com/squaremo/amqp.node/issues/338.
Did this issue ever get a solution, i am facing it as of 8th March 2019?
...
events.js:174
throw er; // Unhandled 'error' event
^
Error: Heartbeat timeout
at Heart.<anonymous> (/usr/src/app/node_modules/amqplib/lib/connection.js:427:19)
at Heart.emit (events.js:189:13)
at Heart.runHeartbeat (/usr/src/app/node_modules/amqplib/lib/heartbeat.js:88:17)
at ontimeout (timers.js:436:11)
at tryOnTimeout (timers.js:300:5)
at listOnTimeout (timers.js:263:5)
at Timer.processTimers (timers.js:223:10)
Emitted 'error' event at:
at Heart.<anonymous> (/usr/src/app/node_modules/amqplib/lib/connection.js:428:12)
at Heart.emit (events.js:189:13)
[... lines matching original stack trace ...]
at Timer.processTimers (timers.js:223:10)
...
Any updates on this issue please?
`
Error: Heartbeat timeout
at Heart.
at emitNone (events.js:86:13) | prod-svc-31
at Heart.emit (events.js:185:7) | prod-svc-31
| July 6th 2019, 08:32:14.791 | at Heart.runHeartbeat (/opt/services/dev/builds/order-service/v3.890.7/node_modules/amqplib/lib/heartbeat.js:88:17) | prod-svc-31 | July 6th 2019, 08:32:14.791 | at ontimeout (timers.js:386:11) | prod-svc-31
| July 6th 2019, 08:32:14.791 | at tryOnTimeout (timers.js:250:5) | prod-svc-31
| July 6th 2019, 08:32:14.791 | at Timer.listOnTimeout (timers.js:214:5)
`
I can reproduce the originally reported error with the following script. I'm not sure whether the subsequent reports of timeouts are related though.
const amqplib = require('amqplib');
(async () => {
// Heartbeat effectively disabled while due to use of debugger
const connection = await amqplib.connect('amqp://localhost:5672?heartbeat=100000')
connection.on('close', () => {
throw new Error('Oh Noes!')
});
})();
If I allow the script to connect, then restart the docker container hosting the RabbitMQ server, the connection emits a close event. If the handler throws an error, this is caught in amqplib's accept loop and emitted as a frameError, the event listener for which delegates to the onSocketError handler, however because toClosed was called when the server sent the Close command, expectSocketClose is true and the onSocketHandler falls through without emitting an error.
A workaround is to put any code inside the close event handler within a try catch
(async () => {
const connection = await amqplib.connect('amqp://localhost:5672?heartbeat=100000')
connection.on('close', async (err) => {
try {
console.log('Connection error', err)
throw new Error('Oh Noes!')
} catch (err) {
console.log('Error in close handler', err);
}
});
})();
or with a setImmediate timer, e.g.
const amqplib = require('amqplib');
(async () => {
const connection = await amqplib.connect('amqp://localhost:5672?heartbeat=100000')
connection.on('close', (err) => {
setImmediate(() => {
console.log('Connection error', err)
throw new Error('Oh Noes!')
})
});
})();
or to use the async
keyword
const amqplib = require('amqplib');
(async () => {
const connection = await amqplib.connect('amqp://localhost:5672?heartbeat=100000')
connection.on('close', async (err) => {
console.log('Connection error', err)
throw new Error('Oh Noes!')
});
})();
The former can be caught by listening for the process 'uncaughtException' event and the latter by the process 'unhandledRejection' event
Labelling as a bug as amqplib should not be swallowing exceptions.
I can only think of two (unsatisfactory) ways to address this
- Change any amqplib code which emits an event (e.g. error, close, blocked, etc) that can be handled by the user code to do so from a setImmediate function so that user errors automatically bypass amqplib code
- Change any amqplib code which emits an event (e.g. error, close, blocked, etc) that can be handled by the user code to do so within a try/catch block, then find a way to rethrow such errors anywhere they are caught by amqplib
These would only be catchable via the process.on('uncaughtException') event, and not very useful for automated recovery, but would at least flag the error to the user rather than in the above case, causing it to be completely swallowed
A weaker alternative is to update the documentation to clearly state that any event handler code must not emit synchronous errors.
Any thoughts @kibertoad?
@cressie176 Maybe we can allow registering some kind of an error handler that we would use for publishing these errors?
Yes, that's possible. It would mean wrapping each emit call within a try/catch block, e.g.
try {
this.connection.emit('close');
} catch (error) {
this.connection.emit('handler_error', error);
}
but does beg the question, what would we do if the handler_error
handler failed.
Also it might cause confusion, because try/catch would only work for synchronous errors.
but does beg the question, what would we do if the handler_error handler failed.
handler_error would be handled purely in userspace, right? In that case I would argue that it's on a user to handle errors correctly.
What would happen in case of asynchronous errors? Can you give an example code of how that would look?
handler_error would be handled purely in userspace, right? In that case I would argue that it's on a user to handle errors correctly.
Yes, but so is the connection close event handler. I agree it would be reasonable to require that userspace handlers do not throw exceptions from synchronous code; there is not much amqplib can do to handle them, and throwing them may interfere with its internal workings. If we take this approach then I suggest updating the docs to make the requirement explicit and close the issue issue.
My only concern with the above is that some users won't adhere to this even if documented, and it's not great that amqplib swallows errors.
What would happen in case of asynchronous errors? Can you give an example code of how that would look?
Synchronous
connection.on('close', () => {
reconnect();
});
Asynchronous
connection.on('close', () => {
setTimeout(() => reconnect(), 1000);
});
In the synchronous example, if the reconnect function throws an error before becoming asynchronous, the error will be caught in amqplib's accept loop, misinterpreted as a frame error and swallowed.
In the asynchronous example, if the reconnect function throws an error, the error will bypass amqplib entirely because the handler immediately went asynchronous through the setTimeout. As a result the error will likely crash the application but at least it would be reported.
We can enforce this behaviour by emitting events from a setImmediate
function. e.g.
setImmediate(() => this.connection.emit('close')
However, emitting errors asynchronously will mean that the handlers execute slightly later than they do now, which may result in some unexpected behaviour. I also haven't checked whether amqplib ever listens to its own connection close/error events. If it does, then we may also find subtle behaviour changes.