nestjs icon indicating copy to clipboard operation
nestjs copied to clipboard

consumer won't re attach to a queue when rabbitmq cluster gets restarted in 1/5 cases

Open Loki-Afro opened this issue 1 year ago • 1 comments

hey, we run a 3 node rabbitmq cluster where when we restart the cluster in a 1/5 chance the one consumer will not re-connect probably.

It seems like that under certain circumstances the consumer "gets stuck" there is a valid tcp connection there is a valid channel (or 2 not so sure, its an rpc consumer) but there is no consumer attached to the queue as observed in the rabbitmq management ui

the consumer is just that, an app that only does that and nothing else.

2024-02-05T10:02:40.071620174Z [NestWinston] Error    2024-02-05 11:02:40.071 unhandledRejection: Channel ended, no reply will be forthcoming
2024-02-05T10:02:40.071637334Z Error: Channel ended, no reply will be forthcoming
2024-02-05T10:02:40.071641620Z     at rej (/schulcloud-server/node_modules/amqplib/lib/channel.js:201:7)
2024-02-05T10:02:40.071646532Z     at ConfirmChannel.C._rejectPending (/schulcloud-server/node_modules/amqplib/lib/channel.js:207:42)
2024-02-05T10:02:40.071650197Z     at ConfirmChannel.C.toClosed (/schulcloud-server/node_modules/amqplib/lib/channel.js:171:8)
2024-02-05T10:02:40.071653460Z     at Connection.C._closeChannels (/schulcloud-server/node_modules/amqplib/lib/connection.js:394:18)
2024-02-05T10:02:40.071656725Z     at Connection.C.toClosed (/schulcloud-server/node_modules/amqplib/lib/connection.js:401:8)
2024-02-05T10:02:40.071659930Z     at Connection.C.onSocketError (/schulcloud-server/node_modules/amqplib/lib/connection.js:355:10)
2024-02-05T10:02:40.071663518Z     at Connection.emit (node:events:517:28)
2024-02-05T10:02:40.071666965Z     at Socket.go (/schulcloud-server/node_modules/amqplib/lib/connection.js:481:12)
2024-02-05T10:02:40.071670268Z     at Socket.emit (node:events:517:28)
2024-02-05T10:02:40.071673667Z     at emitReadable_ (node:internal/streams/readable:633:12)
2024-02-05T10:02:40.071688708Z     at processTicksAndRejections (node:internal/process/task_queues:81:21) - {"error":{"cause":{},"isOperational":true},"stack":"Error: Channel ended, no reply will be forthcoming\n    at rej (/schulcloud-server/node_modules/amqplib/lib/channel.js:201:7)\n    at ConfirmChannel.C._rejectPending (/schulcloud-server/node_modules/amqplib/lib/channel.js:207:42)\n    at ConfirmChannel.C.toClosed (/schulcloud-server/node_modules/amqplib/lib/channel.js:171:8)\n    at Connection.C._closeChannels (/schulcloud-server/node_modules/amqplib/lib/connection.js:394:18)\n    at Connection.C.toClosed (/schulcloud-server/node_modules/amqplib/lib/connection.js:401:8)\n    at Connection.C.onSocketError (/schulcloud-server/node_modules/amqplib/lib/connection.js:355:10)\n    at Connection.emit (node:events:517:28)\n    at Socket.go (/schulcloud-server/node_modules/amqplib/lib/connection.js:481:12)\n    at Socket.emit (node:events:517:28)\n    at emitReadable_ (node:internal/streams/readable:633:12)\n    at processTicksAndRejections (node:internal/process/task_queues:81:21)","exception":true,"date":"Mon Feb 05 2024 11:02:40 GMT+0100 (Central European Standard Time)","process":{"pid":17,"uid":1000,"gid":1000,"cwd":"/schulcloud-server","execPath":"/usr/local/bin/node","version":"v18.19.0","argv":["/usr/local/bin/node","/schulcloud-server/dist/apps/server/apps/preview-generator-consumer.app"],"memoryUsage":{"rss":98619392,"heapTotal":41897984,"heapUsed":37594488,"external":1600487,"arrayBuffers":223086}},"os":{"loadavg":[1.64,2.64,6.72],"uptime":149252.74},"trace":[{"column":7,"file":"/schulcloud-server/node_modules/amqplib/lib/channel.js","function":"rej","line":201,"method":null,"native":false},{"column":42,"file":"/schulcloud-server/node_modules/amqplib/lib/channel.js","function":"ConfirmChannel.C._rejectPending","line":207,"method":"_rejectPending","native":false},{"column":8,"file":"/schulcloud-server/node_modules/amqplib/lib/channel.js","function":"ConfirmChannel.C.toClosed","line":171,"method":"toClosed","native":false},{"column":18,"file":"/schulcloud-server/node_modules/amqplib/lib/connection.js","function":"Connection.C._closeChannels","line":394,"method":"_closeChannels","native":false},{"column":8,"file":"/schulcloud-server/node_modules/amqplib/lib/connection.js","function":"Connection.C.toClosed","line":401,"method":"toClosed","native":false},{"column":10,"file":"/schulcloud-server/node_modules/amqplib/lib/connection.js","function":"Connection.C.onSocketError","line":355,"method":"onSocketError","native":false},{"column":28,"file":"node:events","function":"Connection.emit","line":517,"method":"emit","native":false},{"column":12,"file":"/schulcloud-server/node_modules/amqplib/lib/connection.js","function":"Socket.go","line":481,"method":"go","native":false},{"column":28,"file":"node:events","function":"Socket.emit","line":517,"method":"emit","native":false},{"column":12,"file":"node:internal/streams/readable","function":"emitReadable_","line":633,"method":null,"native":false},{"column":21,"file":"node:internal/process/task_queues","function":"processTicksAndRejections","line":81,"method":null,"native":false}]} +0ms
2024-02-05T10:02:45.125243487Z [Nest] 17  - 02/05/2024, 11:02:45 AM     LOG [AmqpConnection] Successfully connected to RabbitMQ broker (default)
2024-02-05T10:02:45.153468493Z [Nest] 17  - 02/05/2024, 11:02:45 AM     LOG [AmqpConnection] Successfully connected a RabbitMQ channel "AmqpConnection"

you can see at the very end that it successfully reconnected, but just the connection and the channel, but there was no "resumeConsumer" or "basicConsume" called afterwards so that the consumer is "re-attached"

btw, i had the same problem in an older java spring version where i was able to write a health indicator, to observer the consumers status, with something like isRunning() unfortunately there is nothing similar here to do such a workaround, or is there?

Loki-Afro avatar Feb 05 '24 12:02 Loki-Afro

@Loki-Afro Would you be able to help us with a PR? There may be indeed a missing feature to "restore" the queues when you restart the cluster

underfisk avatar Sep 26 '24 12:09 underfisk

This issue is still present, sadly. @underfisk, did you guys manage to replicate the issue? You should create a cluster with three nodes of rabbitmq locally and start the service while rabbitmq is down and then regularly restart RabbitMQ a few times, wait for the service to connect and so on.

RaresAil avatar Jan 28 '25 14:01 RaresAil

@RaresAil I have not sorry, i have been a bit busy :/ Are you able to help me out by creating a Github repro ready to be tested with all the instructions needed so I don't forget? Alternatively, if you could help with the case since you've got a pretty good understanding of the problem that would be interesting. I'm actually curious whether the issue lies in the underlying library or how Golevelup RabbitMQ handles the connection states

underfisk avatar Jan 28 '25 14:01 underfisk

@RaresAil I have not sorry, i have been a bit busy :/ Are you able to help me out by creating a Github repro ready to be tested with all the instructions needed so I don't forget? Alternatively, if you could help with the case since you've got a pretty good understanding of the problem that would be interesting. I'm actually curious whether the issue lies in the underlying library or how Golevelup RabbitMQ handles the connection states

I will try to create a repo that can reproduce this and than to see maybe i can find the issue in code

RaresAil avatar Jan 28 '25 14:01 RaresAil

@RaresAil I appreciate that very much 🙏

underfisk avatar Jan 28 '25 14:01 underfisk

@underfisk After some investigation, I think this issue can be solved by defining Disaster Recovery and High Availability policies in RabbitMQ. I just found out that our cluster does not have that, even if it is a cluster, but locally, I had those defined and couldn't reproduce the issue

RaresAil avatar Jan 28 '25 17:01 RaresAil

@underfisk I think you can close this issue, as using Quorum Queues also fixes this.

RaresAil avatar Jan 29 '25 17:01 RaresAil