node-amqp-connection-manager icon indicating copy to clipboard operation
node-amqp-connection-manager copied to clipboard

graceful close

Open ehaynes99 opened this issue 1 year ago • 4 comments

Currently, when a channel is closed, the promises for all pending messages are immediately rejected. When spinning down nodes in a cluster, I don't really want to just pull the rug out from under it, but rather let it finish what it's doing. As far as I can tell, there's no exposed mechanism to wait for sends to complete at the ChannelWrapper level. Checking queueLength is insufficient, because the in-flight messages don't count towards that total. I have various things pushing messages, so to track them all externally, I'll have to make a ChannelWrapperWrapper that keeps track of all of the pending promises in a single place, which seems a bit clumsy. Is there a simpler way to do this?

ehaynes99 avatar Jul 11 '23 20:07 ehaynes99

By and large, this library attempts to be a thin wrapper around the underlying amqplib, and it would do the same thing. I'm not sure if graceful close is something I want to include in this library. Are there other libraries out there that will cover that use case? There are all sorts of interesting corner cases which might make a generic approach to this challenging (e.g. RPC use cases where we want to wait until all replies have been collected to shut things down).

That said, we do already keep track of all the messages that are queued-but-not-yet-sent and sent-but-not-acked, so I'd happily accept a PR that exposes these counts outside of the ChannelWrapper.

On several occasions I've also tried to write some kind of generic interface for a "message store" so that instead of storing unsent messages in RAM you could store them in Postgres or Redis or something similar. This is far more complicated than it first seems (if we have a queued message, and the system restarts, we could reload all these messages from Redis, but then whomever sent these messages originally would no longer have a Promise to listen to to figure out whether the message succeeds or not. What would we do if one of these messages fails?) Something like this would probably make your life easier, but it's also probably not coming any time soon. :P

jwalton avatar Jul 14 '23 13:07 jwalton

At first I thought about using a drain event, as vanilla amqplib does emit one, though after digging a bit, I'm not sure that it's a workable solution for graceful shutdown anyway, so probably not a pattern to emulate here:

  • there is probably a race condition between when that passthrough fires the event and when the content is transmitted to the server, because the muxer manually calls read rather than piping
  • the event alone isn't sufficient to do a graceful close, because you should only wait for the event if the internal buffer is non-empty, so you'd have to expose the sizes anyway.

The "thickest" part of this wrapper lib is that it maintains its own message buffers, though. I don't see how another library could cover this, particularly because ChannelWrapper has different signatures for publish and sendToQueue. What would you think about something like this (I would add tests and fix docs before a PR)?

Otherwise, I can hack it in with something like this. It's important to note, however, that promise-breaker has incorrect type definitions. If you supply a callback, it returns null, NOT a Promise, so this would only work if you know that all call sites will use the Promise flavor (ok in my case, but YMMV).

import { ChannelWrapper } from 'amqp-connection-manager'

export const withGracefulClose = (channel: ChannelWrapper): ChannelWrapper => {
  const pending = new Set<Promise<boolean>>()

  const wrapMethod = <T extends (...args: any[]) => Promise<boolean>>(method: T) => {
    const original = method.bind(channel)

    return (...args: Parameters<T>) => {
      const promise = original(...args)
      pending.add(promise)
      promise.finally(() => pending.delete(promise))

      return promise
    }
  }

  channel.publish = wrapMethod(channel.publish)
  channel.sendToQueue = wrapMethod(channel.sendToQueue)

  const originalClose = channel.close.bind(channel)
  channel.close = async () => {
    await Promise.allSettled([...pending])
    return originalClose()
  }
  return channel
}

As far as the message store, one of our apps actually does exactly that because we needed the ability to schedule messages for the future. We have an API endpoint where senders post the message with a sendAt timestamp, which in turn inserts it into a database, then sends the message to Rabbit at a later time. From the client's perspective, successfully persisting the message is equivalent to "successfully sent to the queue". As this can be arbitrarily far into the future, we don't use native RPC for this, but we have a convention to include "reply to" metadata in the message that the eventual consumer can use to publish a result for a fully asynchronous, bidirectional operation. Generally, Rabbit's RPC is used with anonymous queues that don't outlive the connection, and particularly if you're requiring data from a closure, the native RPC behavior doesn't really mix with server restarts.

Multi-tenancy is another big can of worms. We only have one process polling the db and sending messages to Rabbit. In our particular use-case, that's good enough, but if you needed more exact timing, you would need multiple processes (nodes) doing this in parallel, so the store would need a concept of synchronization. E.g. a postgres transaction that both retrieves the record and marks it "checked out", and some other mechanism to handle a worker dying while a record is checked out... It could easily end up with too many architectural decisions to handle in a library like this, as you would need to implement such synchronization for any type of store you wanted to support.

ehaynes99 avatar Jul 18 '23 21:07 ehaynes99

One other quick note... According to the docs, closing a channel will wait for the buffers to be emptied. Closing the connection would cause them all to drop. Thus, it's probably more consistent with the vanilla library for close on the channel to flush the buffer. https://amqp-node.github.io/amqplib/ ("How do I flush messages? / Why are messages not sent?" section)

however if you close the connection or your application exits while there are still messages in the buffer, they will be lost. Be sure to explicity close the channel and wait for the returned promise to resolve, or supplied callback to be invoked before closing the connection or terminating your application.

ehaynes99 avatar Jul 20 '23 17:07 ehaynes99

@ehaynes99 Have you come up with some solution regarding this? What did you end up going with?

slavab89 avatar Nov 01 '23 15:11 slavab89