cqrs icon indicating copy to clipboard operation
cqrs copied to clipboard

Feature Request: Efficient Event Publishing with NestJS and RabbitMQ

Open masterj3y opened this issue 8 months ago • 1 comments

Goal

I want to implement an event publisher in NestJS that can:

  1. Publish events internally to local event handlers.
  2. Publish events externally using RabbitMQ for microservices communication.

Current Implementation

I wrote the following custom event publisher that:

  • Scans for all IEventHandler implementations using ExplorerService.
  • Registers event handlers dynamically.
  • Routes events internally or publishes them to RabbitMQ based on metadata.

Here’s the current implementation of the RabbitMQEventPublisher:

import { AmqpConnection } from '@golevelup/nestjs-rabbitmq';
import { ModuleRef, ModulesContainer, Reflector } from '@nestjs/core';
import { AsyncContext, IEvent, IEventHandler, IEventPublisher } from '@nestjs/cqrs';
import { CqrsEventMetaData } from '../decorators/cqrs-event.decorator';
import { RMQ_EVENTS_EXCHANGE_NAME } from 'infrastructure/config';
import { Logger, Type } from '@nestjs/common';
import { ExplorerService } from '@nestjs/cqrs/dist/services/explorer.service';
import { InstanceWrapper } from '@nestjs/core/injector/instance-wrapper';
import { defaultEventIdProvider } from '../helpers/default-id-provder.helper';

export class RabbitMQEventPublisher<EventBase extends IEvent = IEvent>
  implements IEventPublisher<EventBase>
{
  private readonly logger = new Logger(RabbitMQEventPublisher.name);
  private readonly eventIdProvider = defaultEventIdProvider;
  private readonly eventHandlers = new Map<
    string,
    (eventBase: EventBase) => Promise<void>
  >();

  constructor(
    private readonly reflector: Reflector,
    modulesContainer: ModulesContainer,
    private readonly moduleRef: ModuleRef,
    explorerService: ExplorerService,
    private readonly amqpConnection: AmqpConnection,
  ) {
    const modules = [...modulesContainer.values()];
    explorerService
      .flatMap<IEventHandler>(modules, (instance) =>
        explorerService.filterByMetadataKey(instance, '__eventsHandler__'),
      )
      .forEach((handler) => this.registerHandler(handler));
  }

  private registerHandler(handler: InstanceWrapper<IEventHandler<EventBase>>) {
    const typeRef = handler.metatype as Type<IEventHandler<EventBase>>;
    const events = this.reflectEvents(typeRef);

    if (!events || events.length === 0) {
      this.logger.error(`No events found for handler: ${typeRef.name}`);
      return;
    }

    events.forEach((event) => {
      const eventId = this.eventIdProvider.getEventId(event);
      const boundHandler = this.bind(handler, eventId!);
      this.eventHandlers.set(event.name, boundHandler);
      this.logger.log(`Registered event handler for: ${event.name}`);
    });
  }

  private reflectEvents(handler: Type<IEventHandler<EventBase>>): Type<EventBase>[] {
    const events = Reflect.getMetadata('__eventsHandler__', handler) || [];
    if (events.length === 0) {
      this.logger.error(`No metadata found for handler: ${handler.name}`);
    }
    return events;
  }

  async publish<TEvent extends EventBase>(
    event: TEvent,
    _dispatcherContext?: unknown,
    _asyncContext?: AsyncContext,
  ) {
    const eventMetadata = this.reflector.get<CqrsEventMetaData>(
      'metadata',
      event.constructor,
    );

    const eventData = {
      message: event,
      event: event.constructor.name,
    };

    try {
      const handler = this.eventHandlers.get(event.constructor.name);

      if (!handler) {
        this.logger.error(`No handler found for event: ${event.constructor.name}`);
        return;
      }

      this.logger.log(`Executing handler for event: ${event.constructor.name}`);
      await handler(event);

      if (eventMetadata && eventMetadata.routingKeys.length > 0) {
        for (const routingKey of eventMetadata.routingKeys) {
          this.amqpConnection.publish(RMQ_EVENTS_EXCHANGE_NAME, routingKey, eventData);
        }
      }
    } catch (e) {
      this.logger.error(`Error publishing event <${eventData.event}>`, e);
    }
  }
}

Question

Is this the best way to retrieve event handlers dynamically, or is there an existing feature in NestJS CQRS that I should use instead?

  • I am currently scanning for IEventHandler instances manually using ExplorerService and ModulesContainer.
  • Does the NestJS CQRS module already provide a built-in way to retrieve event handlers dynamically without this manual lookup?

Would appreciate any insights or alternative approaches! 🚀

masterj3y avatar Feb 22 '25 11:02 masterj3y