Create a way to detect channel close to be able to recreate the channel
I had an issue when I ack some messages who come from a queue that was marked as non-ack. So calling basicAck or basicNack was throw an error.
This issue was solved and my system now is running as expected but was sad not found a way to detect channel close event to be able to recreate/reconnect that channel.
I have:
// Connect and create a channel to RabbitMQ
this.client = new AMQPClient(connectionUrl)
this.connection = await this.client.connect()
this.channel = await this.connection.channel()
this.client.onerror = (error) => {
console.error(error)
this.tryReconnect()
}
// Handle disconnection
this.connection.onerror = () => {
console.error(
`[RabbitMqClient] ${new Date().toISOString()} Upps! We have a connection error... Trying to reconnect with exponential backoff..`
)
this.tryReconnect()
}
But this only work when RabbitMQ server goes down not when the channel crash.
Good feedback. An onerror on channel is coming up too.
On Tue, 9 Aug 2022, 17:16 Ignacio F. Castillejo Gómez, < @.***> wrote:
I had an issue when I ack some messages who come from a queue that was marked as non-ack. So calling basicAck or basicNack was throw an error.
This issue was solved and my system now is running as expected but was sad not found a way to detect channel close event to be able to recreate/reconnect that channel.
I have:
// Connect and create a channel to RabbitMQ this.client = new AMQPClient(connectionUrl) this.connection = await this.client.connect() this.channel = await this.connection.channel()
this.client.onerror = (error) => { console.error(error) this.tryReconnect() }
// Handle disconnection this.connection.onerror = () => { console.error(
[RabbitMqClient] ${new Date().toISOString()} Upps! We have a connection error... Trying to reconnect with exponential backoff..) this.tryReconnect() }But this only work when RabbitMQ server goes down not when the channel crash.
— Reply to this email directly, view it on GitHub https://github.com/cloudamqp/amqp-client.js/issues/40, or unsubscribe https://github.com/notifications/unsubscribe-auth/AABL6TXXKUIVVB4BOK2FSETVYJY3RANCNFSM56BFWBLQ . You are receiving this because you are subscribed to this thread.Message ID: @.***>
When do you estimate this feature could be released? Thanks for the quickly response @carlhoerberg
Hi @carlhoerberg,
Event handlers have caused one of the most difficult to resolve issues with amqplib. Specifically, because emitting events is synchronous, if the handler throws an error, it can interfere with the amqplib internals and be swallowed or manifest as something completely different. I'm not suggesting you avoid event handlers, just that you give some thought as to how you avoid the same pitfalls amqplib has.
One idea I've had is to emit events from a utility function wraps the actual emission in a try/catch. If an error is thrown it could emit an error event asynchronously. Still not sure though.
For this functionality in my application, I patched into the package (with patch-package) a lastHeartbeat field that is accessible by the consuming application. It is updated with performance.now() at every received message and on connect.
I haven't yet tested it with 2.1.0, but looking at the diff of changes, it seems that onerror is mainly called if the browser detects the socket connection close, not if it missed the heartbeat timeout. So they might be complementary to each other.
amqp-client does know the requested heartbeat interval so could perform this check too, but exposing lastHeartbeat and doing the rest in the app resulted in the simplest patch.
For this functionality in my application, I patched into the package (with
patch-package) alastHeartbeatfield that is accessible by the consuming application. It is updated withperformance.now()at every received message and on connect.I haven't yet tested it with 2.1.0, but looking at the diff of changes, it seems that
onerroris mainly called if the browser detects the socket connection close, not if it missed the heartbeat timeout. So they might be complementary to each other.amqp-client does know the requested heartbeat interval so could perform this check too, but exposing
lastHeartbeatand doing the rest in the app resulted in the simplest patch.
Could you please share a demo example of that? Thanks for the tip 🙏
This is the patch against @cloudamqp/[email protected]. I apply it with patch-package.
View `@cloudamqp+amqp-client+2.0.3.patch`
diff --git a/node_modules/@cloudamqp/amqp-client/lib/mjs/amqp-base-client.js b/node_modules/@cloudamqp/amqp-client/lib/mjs/amqp-base-client.js
index dea7443..166de60 100644
--- a/node_modules/@cloudamqp/amqp-client/lib/mjs/amqp-base-client.js
+++ b/node_modules/@cloudamqp/amqp-client/lib/mjs/amqp-base-client.js
@@ -26,6 +26,7 @@ export class AMQPBaseClient {
if (heartbeat < 0)
throw new Error("heartbeat must be positive");
this.heartbeat = heartbeat;
+ this.lastHeartbeat = undefined;
}
channel(id) {
if (this.closed)
@@ -67,6 +68,7 @@ export class AMQPBaseClient {
if (this.closed)
return this.rejectClosed();
this.closed = true;
+ this.lastHeartbeat = undefined;
let j = 0;
const frame = new AMQPView(new ArrayBuffer(512));
frame.setUint8(j, 1);
@@ -619,6 +621,7 @@ export class AMQPBaseClient {
}
case 8: {
const heartbeat = new Uint8Array([8, 0, 0, 0, 0, 0, 0, 206]);
+ this.lastHeartbeat = performance.now();
this.send(heartbeat).catch(err => console.warn("Error while sending heartbeat", err));
break;
}
diff --git a/node_modules/@cloudamqp/amqp-client/lib/mjs/amqp-websocket-client.js b/node_modules/@cloudamqp/amqp-client/lib/mjs/amqp-websocket-client.js
index 0d90507..55e3c4e 100644
--- a/node_modules/@cloudamqp/amqp-client/lib/mjs/amqp-websocket-client.js
+++ b/node_modules/@cloudamqp/amqp-client/lib/mjs/amqp-websocket-client.js
@@ -13,6 +13,7 @@ export class AMQPWebSocketClient extends AMQPBaseClient {
this.socket = socket;
socket.binaryType = "arraybuffer";
socket.onmessage = this.handleMessage.bind(this);
+ this.lastHeartbeat = performance.now();
return new Promise((resolve, reject) => {
this.connectPromise = [resolve, reject];
socket.onclose = reject;
@@ -22,7 +23,7 @@ export class AMQPWebSocketClient extends AMQPBaseClient {
}
send(bytes) {
return new Promise((resolve, reject) => {
- if (this.socket) {
+ if (this.socket && this.socket.readyState === this.socket.OPEN) {
try {
this.socket.send(bytes);
resolve();
@@ -80,6 +81,7 @@ export class AMQPWebSocketClient extends AMQPBaseClient {
this.frameSize = this.framePos = 0;
}
}
+ this.lastHeartbeat = performance.now();
}
static platform() {
if (typeof (window) !== 'undefined')
diff --git a/node_modules/@cloudamqp/amqp-client/src/amqp-base-client.ts b/node_modules/@cloudamqp/amqp-client/src/amqp-base-client.ts
index f009883..82059bc 100644
--- a/node_modules/@cloudamqp/amqp-client/src/amqp-base-client.ts
+++ b/node_modules/@cloudamqp/amqp-client/src/amqp-base-client.ts
@@ -23,6 +23,7 @@ export abstract class AMQPBaseClient {
channelMax = 0
frameMax: number
heartbeat: number
+ lastHeartbeat: number | undefined
/**
* @param name - name of the connection, set in client properties
* @param platform - used in client properties
@@ -43,6 +44,7 @@ export abstract class AMQPBaseClient {
this.frameMax = frameMax
if (heartbeat < 0) throw new Error("heartbeat must be positive")
this.heartbeat = heartbeat
+ this.lastHeartbeat = undefined
}
/**
diff --git a/node_modules/@cloudamqp/amqp-client/src/amqp-websocket-client.ts b/node_modules/@cloudamqp/amqp-client/src/amqp-websocket-client.ts
index 04331d9..552ca3f 100644
--- a/node_modules/@cloudamqp/amqp-client/src/amqp-websocket-client.ts
+++ b/node_modules/@cloudamqp/amqp-client/src/amqp-websocket-client.ts
@@ -28,6 +28,7 @@ export class AMQPWebSocketClient extends AMQPBaseClient {
this.socket = socket
socket.binaryType = "arraybuffer"
socket.onmessage = this.handleMessage.bind(this)
+ this.lastHeartbeat = performance.now()
return new Promise((resolve, reject) => {
this.connectPromise = [resolve, reject]
socket.onclose = reject
@@ -42,7 +43,7 @@ export class AMQPWebSocketClient extends AMQPBaseClient {
*/
override send(bytes: Uint8Array): Promise<void> {
return new Promise((resolve, reject) => {
- if (this.socket) {
+ if (this.socket && this.socket.readyState === this.socket.OPEN) {
try {
this.socket.send(bytes)
resolve()
@@ -108,6 +109,7 @@ export class AMQPWebSocketClient extends AMQPBaseClient {
this.frameSize = this.framePos = 0
}
}
+ this.lastHeartbeat = performance.now()
}
static platform(): string {
diff --git a/node_modules/@cloudamqp/amqp-client/types/amqp-base-client.d.ts b/node_modules/@cloudamqp/amqp-client/types/amqp-base-client.d.ts
index b67d0f5..9867249 100644
--- a/node_modules/@cloudamqp/amqp-client/types/amqp-base-client.d.ts
+++ b/node_modules/@cloudamqp/amqp-client/types/amqp-base-client.d.ts
@@ -18,6 +18,7 @@ export declare abstract class AMQPBaseClient {
channelMax: number;
frameMax: number;
heartbeat: number;
+ lastHeartbeat: number | undefined;
/**
* @param name - name of the connection, set in client properties
* @param platform - used in client properties
Then in my application I do something like this:
const AMQP_HEARTBEAT_SEC = 10;
amqpRef.current.client = new AMQPWebSocketClient(
amqpUrl,
"/",
amqpToken.username,
amqpToken.password,
`app (${VERSION})`,
4096,
AMQP_HEARTBEAT_SEC
);
and
window.clearInterval(amqpRefCurrent.heartbeatTimerId ?? undefined);
amqpRefCurrent.heartbeatTimerId = window.setInterval(async () => {
if (conn.closed) {
logger.warn("Connection closed at heartbeat.");
window.clearInterval(amqpRefCurrent.heartbeatTimerId ?? undefined);
setAmqpState(AmqpState.Disconnected);
} else if (
typeof conn.lastHeartbeat === "number" &&
performance.now() - conn.lastHeartbeat >
2 * AMQP_HEARTBEAT_SEC * 1000
) {
logger.warn("Heartbeat timed out.");
try {
await conn.close("Heartbeat timed out.");
} catch (e) {
console.error("conn.close", e);
}
window.clearInterval(amqpRefCurrent.heartbeatTimerId ?? undefined);
setAmqpState(AmqpState.Disconnected);
}
}, (AMQP_HEARTBEAT_SEC / 2) * 1000);
So with a requested heartbeat of 10 seconds, the application checks every 5 seconds if it has been at least 20 seconds since the last received message. If so, the interval code then sets the state of the connection in the application as disconnected, which starts a re-connect cycle (coded elsewhere).
Node.js has different methods to perform the window.setInterval() and performance.now() functionality, so I would implement it differently in the this library to make it cross-platform compatible. If keeping the timer in the calling application, maybe just expose a method called timeSinceLastRxMessage(). If this library includes the timer functionality, then just exposing the error through the new onerror callback would be enough.