nestjs icon indicating copy to clipboard operation
nestjs copied to clipboard

RabbitMQ: Find a way to attach handlers to Nest Context

Open WonderPanda opened this issue 5 years ago • 15 comments

The RabbitMQ module should be built such that it plays nicely with the existing Nest processing pipeline as far as the ability to integrate with other Nest components such as Pipes, Interceptors and Guards. The @nestjs/microservices package accomplishes this but it doesn't appear that it exposes the necessary functionality to glue these things together in an easy to use way. The current approach of using ExternalContextCreator to build up the handler function doesn't appear to be working currently.

RabbitMQ handlers (for both PubSub and RPC) should:

  • [ ] Work with Pipes
  • [x] Work with Interceptors
  • [ ] Work with Guards

WonderPanda avatar Mar 07 '19 16:03 WonderPanda

With the release of Nest 6, there is supposed to be support for external context being integrated properly with the Nest request pipeline. Once #26 is complete it should be possible to revisit this and complete implementation

WonderPanda avatar Mar 19 '19 22:03 WonderPanda

hi @WonderPanda are there any updates on this issue?

mabuonomo avatar Jan 08 '21 09:01 mabuonomo

@mabuonomo Interceptors definitely work and are flexible enough that they can represent pretty much any of the other enhancers.

Is there specific functionality that you would like to integrate?

WonderPanda avatar Jan 08 '21 14:01 WonderPanda

@WonderPanda, thankyou for your quick reply, yes the interceptor works very well, but I have some problem with the custom decorator.

For example:

export const ReportMessage = createParamDecorator((data: unknown, context: ExecutionContext) => {
  return context.switchToHttp().getRequest().msg;
});
@RabbitSubscribe({
    exchange: '---',
    routingKey: '',
    queue: '---',
  })
  @UseInterceptors(new InboxValidationInterceptor())
  async updateLeadStatus(@ReportMessage() message: {}): Promise<Nack> {
...
}

The decorator ReportMessage is never called, the message object contains all request data and not only msg object.

mabuonomo avatar Jan 08 '21 16:01 mabuonomo

@mabuonomo I'm not sure off the top of my head whether or not it will be easy for custom decorators to be used when processing incoming messages.

However, what you are showing in your custom decorator will never be possible as RabbitMQ does not use an HTTP Context for message passing

WonderPanda avatar Jan 08 '21 16:01 WonderPanda

Hi @WonderPanda what about injection scoped (especially interested in request scope). Like somehow trigger amqpConnection dependent services and controllers to be reinstantiated?

tataqwerty avatar Feb 08 '21 20:02 tataqwerty

@tataqwerty Can you please elaborate on your use case? Messages received over RabbitMQ are not tied to an HTTP request so it doesn't really make sense to ever try and make them request scoped. I don't even think it would be possible from a technical standpoint.

Everyone's app is different though so if you have a specific use case where this is important please share it here

WonderPanda avatar Feb 08 '21 20:02 WonderPanda

@WonderPanda I have a specific example that I am running into and @mabuonomo was going down the same path I am going down (and to no avail).

Basically, rather than just getting the message in our consumers, we want to receive some sort of context. In our specific example, we want to set an integrationId(or more simply a groupId), but this grouping is figured out inside of the consumer, not beforehand. So, right now we have an interceptor that hijacks the ampq channel message(the second arg) and adds our custom context class, i.e. channel.context = new MyClass().

Inside of the consuming function, rather than having to do channel.context.setIntegrationId(), we'd prefer to just get the context object, and deal with it. Great! Perfect use case for a paramDecorator. Unfortunately, it seems parameter decorators don't get exectuted AT ALL.

import { createParamDecorator, ExecutionContext } from '@nestjs/common'

export const MessageContextParam = createParamDecorator((data, ctx: ExecutionContext) => {
  const context = ctx.getArgByIndex(1)
  return context.context
})

When used with the following function for example, the decorator is not executed. Just as an FYI I have a decorator wrapper that wraps both our rabbitSubscribe, and our interceptor together, but it still happens even when using just rabbitSubscribe

 @MessageBusService.subscribe({
    exchange: messageBusConstants.exchanges.changeTracker.exchangeName,
    routingKey: [ChangeTrackerEvents.EXECUTE.valueOf()],
    queue: 'change_tracker_do_migration',
    queueOptions: {
      durable: true,
    },
  })
  async doMigration(
    event: MessageBusMessage<MigrationEvent>,
    _,
    @MessageContextParam() context: MessageContext,
  )

Ideally, we would be able to just pass/reassign the 2nd arg in our inteceptor, but that's currently not possible in nestjs, and middlewares won't work with rabbitsubscribe I would assume, since it's hooked specifically to http requests.

JacobT14 avatar Sep 01 '21 17:09 JacobT14

@JacobT14 I can definitely see the use case for this and think it woulds be a fantastic addition. Unfortunately I've had almost no time lately to look into improvements on the RabbitMQ side of things as I'm no longer using it for my day job which makes it difficult to context switch and prioritize. Any chance you feel like taking a crack at implementing a custom context for RabbitMQ messages? It should definitely be possible just needs a bit of investigation on the NestJS internals

WonderPanda avatar Sep 02 '21 18:09 WonderPanda

@WonderPanda I'll be honest, I'm somewhat new to nestjs, so I'm not really sure where I would start. The closest I would get would be just figuring out the reflection stuff inside of the handleMessage function(essentially just following how they handle it internally), but I'm not sure if that's actually what needs to happen. So, if you provide some initial guidance I might look into it, otherwise I have worked around it so I'm not blocked or anything. I'm just not sure if there's a way for us to hook into what they have as far as injecting it, or if we need to duplicate what they are doing.

JacobT14 avatar Sep 02 '21 19:09 JacobT14

@JacobT14 No worries, my first hunch would be to take a look at the ExternalContextCreator as its how we currently wire up the RabbitMQ handlers to the NestJS pipeline https://github.com/golevelup/nestjs/blob/46e6f93f190db29bbc4348b1e0b6a686b67dbb30/packages/rabbitmq/src/rabbitmq.module.ts#L127

Nest has a few built in context types for HTTP, RPC, and GraphQL already so I definitely think it should be possible to extend what we're currently doing to give a much deeper integration that is specifically tailored to RabbitMQ

Unfortunately its not exactly a documented API, I had to do a bunch of source diving in the NestJS code base originally to figure out how to get this far

WonderPanda avatar Sep 02 '21 19:09 WonderPanda

I'm working on a multi-tenant app and would like to use a guard to inject information about the tenant in the execution context and access it in the handler using a param decorator. That works well using the built-in NestJS microservices module, but I had to switch to this library as it supports pub/sub. I just realized param decorators don't get executed at all. @JacobT14 how did you work around it?

andrucz avatar Sep 20 '22 21:09 andrucz

I've just encountered almost exactly the same use case as @andrucz. Multi-tenant app, need to inject tenant information into the context, but cannot get paramdecorators to work. Has anyone managed to work around this?

whoistobias avatar Nov 02 '22 15:11 whoistobias

I am thinking we can take advantage of consumer tags. I create a queue and register handlers myself. Ideally connection.createSubscriber() will give me the consumer tags, so I can map it to the tenant i am creating the queue for.

But it seems that the returned consumer tag is undefined for connection.createSubscriber(handler, mergedConfig, discoveredMethod.methodName); I will need investigate more.

Yeah i found my problem, i need add this connectionInitOptions: { wait: true }, to my RabbitMQConfig, when i dynamically setup rabbit mq

huantaoliu avatar Jan 18 '24 17:01 huantaoliu

@huantaoliu would you mind sharing some examples?

gal1419 avatar Feb 27 '24 17:02 gal1419