nest icon indicating copy to clipboard operation
nest copied to clipboard

Define specific client in MessagePattern decorator (microservices)

Open frct1 opened this issue 1 year ago • 7 comments

Is there an existing issue that is already proposing this?

  • [X] I have searched the existing issues

Is your feature request related to a problem? Please describe it

Hello, We have case to always keep connections to multiple NATS servers in the same time but each of them are specific service and there could be same subject across from producers. I can see that @MessagePattern have argument with type of Transport | symbol.

Describe the solution you'd like

So this approach will provide a better flexibility for the developers.

ClientsModule.register([
      {
        name: FIRST_SERVICE_CLIENT, // Symbol
        transport: Transport.NATS,
        options: {
            url: `nats://${process.env.FIRST_NATS_HOST}`,
            name: 'FIRST_SERVICE_LISTENER',
            headers: { 'x-version': '1.0.0' },
            serializer: new OutboundResponseExternalSerializer(),
            deserializer: new InboundMessageExternalDeserializer(),
        }
      },
       {
        name: SECOND_SERVICE_CLIENT, // typeof Symbol
        transport: Transport.NATS,
        options: {
            url: `nats://${process.env.SECOND_NATS_HOST}`,
            name: 'SECOND_SERVICE_LISTENER',
            headers: { 'x-version': '1.0.0' },
            serializer: new OutboundResponseExternalSerializer(),
            deserializer: new InboundMessageExternalDeserializer(),
        }
      },
    ]),
async onApplicationBootstrap() {
    await this.first_client.connect()
    await this.second_client.connect()
}
@MesssagePattern('message_title', FIRST_SERVICE_CLIENT) // Symbol is passed here
async act_on_message_from_first_service(){}

@MesssagePattern('message_title', SECOND_SERVICE_CLIENT) //Symbol is passed here
async act_on_message_from_second_service(){}

Teachability, documentation, adoption, migration strategy

No migration steps

What is the motivation / use case for changing the behavior?

Better flexibility for microservices

frct1 avatar Mar 19 '23 16:03 frct1

UPD: Yes, in case we connecting our backend to multiple NATS services in main.ts we can specifiy multiple microservices like:

app.connectMicroservice({
    transport: Transport.NATS,
    options: {
      servers: [`nats://${process.env.FIRST_NATS_HOST}`],
      serializer: new OutboundResponseExternalSerializer(),
      deserializer: new InboundMessageExternalDeserializer(),
    },
  });
  app.connectMicroservice({
    transport: Transport.NATS,
    options: {
      servers: [`nats://${process.env.SECOND_NATS_HOST}`],
      serializer: new OutboundResponseExternalSerializer(),
      deserializer: new InboundMessageExternalDeserializer(),
    },
  });

For a better flexibility there must be feature to specify microservice client ID/unique name to inject it into DI (like in case with ClientsModule) that can be used in @MessagePattern or @EventPattern decorators to split data streams from same transports

frct1 avatar Mar 19 '23 16:03 frct1

Would you like to create a PR for this feature?

kamilmysliwiec avatar Mar 20 '23 07:03 kamilmysliwiec

Would you like to create a PR for this feature?

I guess so. I'm not sure about what is more declarative way to define key for client ? Is the same definition like in ClientsModule would be more useful ?

frct1 avatar Mar 23 '23 08:03 frct1

I have a similar issue where I have two MQTT brokers and I would like to have some context about which broker the message came from.

The concept of named transports would be useful here along with some way of getting that information via a decorator.

petergledhillinclusive avatar Apr 18 '23 15:04 petergledhillinclusive

I have a similar issue where I have two MQTT brokers and I would like to have some context about which broker the message came from.

The concept of named transports would be useful here along with some way of getting that information via a decorator.

I found some way to solve it temporary. Looks like it is working but code isn't beautiful

import { mixin } from '@nestjs/common';
import {ServerNats } from '@nestjs/microservices';
import { EVENTS_CLIENT, SCHEDULER_CLIENT } from './symbols';


export const NatsCustomStrategy = (transport_symbol: symbol) => {
  // eslint-disable-next-line @typescript-eslint/ban-types
  return function <T extends { new (...args: any[]): any }>(constructor: T) {

    const new_class = class extends constructor {
      transportId = transport_symbol;
    };
    const mixed_classes = mixin(new_class);
  
    return mixed_classes;
  };
};

@NatsCustomStrategy(SCHEDULER_CLIENT)
export class NatsSchedulerClient extends ServerNats {}

@NatsCustomStrategy(EVENTS_CLIENT)
export class NatsEventsClient extends ServerNats {}

Depends on your transport you can replace extends ServerNats to appropriate transport for your case and it should work. If you interest we can build up pull request together for this feature

frct1 avatar Apr 18 '23 15:04 frct1

I got same issue about kafka client. I want to consume samely named topics from two different kafka. but, that two topics are on different kafka broker(not replica set) each other(and function is different too). In this case, @MessagePattern('topic') is applied both of them Although I want to use different function logic(post-processing). so It needs to seperate different client id.

8471919 avatar Apr 09 '24 07:04 8471919

I'm interested in this feature as well. My use case is distributing one message across all instances connected to the same topic. I have one consumer who uses one group ID, and I need another consumer with randomUUID group ID in order to get the same message in all instances. Just in case I want to use this feature for websockets when I scale my application horizontally. How can we make priority higher?

KostyaZgara avatar Apr 17 '24 14:04 KostyaZgara