cqrs
cqrs copied to clipboard
Feature Request: Efficient Event Publishing with NestJS and RabbitMQ
Goal
I want to implement an event publisher in NestJS that can:
- Publish events internally to local event handlers.
- Publish events externally using RabbitMQ for microservices communication.
Current Implementation
I wrote the following custom event publisher that:
- Scans for all
IEventHandlerimplementations usingExplorerService. - Registers event handlers dynamically.
- Routes events internally or publishes them to RabbitMQ based on metadata.
Here’s the current implementation of the RabbitMQEventPublisher:
import { AmqpConnection } from '@golevelup/nestjs-rabbitmq';
import { ModuleRef, ModulesContainer, Reflector } from '@nestjs/core';
import { AsyncContext, IEvent, IEventHandler, IEventPublisher } from '@nestjs/cqrs';
import { CqrsEventMetaData } from '../decorators/cqrs-event.decorator';
import { RMQ_EVENTS_EXCHANGE_NAME } from 'infrastructure/config';
import { Logger, Type } from '@nestjs/common';
import { ExplorerService } from '@nestjs/cqrs/dist/services/explorer.service';
import { InstanceWrapper } from '@nestjs/core/injector/instance-wrapper';
import { defaultEventIdProvider } from '../helpers/default-id-provder.helper';
export class RabbitMQEventPublisher<EventBase extends IEvent = IEvent>
implements IEventPublisher<EventBase>
{
private readonly logger = new Logger(RabbitMQEventPublisher.name);
private readonly eventIdProvider = defaultEventIdProvider;
private readonly eventHandlers = new Map<
string,
(eventBase: EventBase) => Promise<void>
>();
constructor(
private readonly reflector: Reflector,
modulesContainer: ModulesContainer,
private readonly moduleRef: ModuleRef,
explorerService: ExplorerService,
private readonly amqpConnection: AmqpConnection,
) {
const modules = [...modulesContainer.values()];
explorerService
.flatMap<IEventHandler>(modules, (instance) =>
explorerService.filterByMetadataKey(instance, '__eventsHandler__'),
)
.forEach((handler) => this.registerHandler(handler));
}
private registerHandler(handler: InstanceWrapper<IEventHandler<EventBase>>) {
const typeRef = handler.metatype as Type<IEventHandler<EventBase>>;
const events = this.reflectEvents(typeRef);
if (!events || events.length === 0) {
this.logger.error(`No events found for handler: ${typeRef.name}`);
return;
}
events.forEach((event) => {
const eventId = this.eventIdProvider.getEventId(event);
const boundHandler = this.bind(handler, eventId!);
this.eventHandlers.set(event.name, boundHandler);
this.logger.log(`Registered event handler for: ${event.name}`);
});
}
private reflectEvents(handler: Type<IEventHandler<EventBase>>): Type<EventBase>[] {
const events = Reflect.getMetadata('__eventsHandler__', handler) || [];
if (events.length === 0) {
this.logger.error(`No metadata found for handler: ${handler.name}`);
}
return events;
}
async publish<TEvent extends EventBase>(
event: TEvent,
_dispatcherContext?: unknown,
_asyncContext?: AsyncContext,
) {
const eventMetadata = this.reflector.get<CqrsEventMetaData>(
'metadata',
event.constructor,
);
const eventData = {
message: event,
event: event.constructor.name,
};
try {
const handler = this.eventHandlers.get(event.constructor.name);
if (!handler) {
this.logger.error(`No handler found for event: ${event.constructor.name}`);
return;
}
this.logger.log(`Executing handler for event: ${event.constructor.name}`);
await handler(event);
if (eventMetadata && eventMetadata.routingKeys.length > 0) {
for (const routingKey of eventMetadata.routingKeys) {
this.amqpConnection.publish(RMQ_EVENTS_EXCHANGE_NAME, routingKey, eventData);
}
}
} catch (e) {
this.logger.error(`Error publishing event <${eventData.event}>`, e);
}
}
}
Question
Is this the best way to retrieve event handlers dynamically, or is there an existing feature in NestJS CQRS that I should use instead?
- I am currently scanning for IEventHandler instances manually using ExplorerService and ModulesContainer.
- Does the NestJS CQRS module already provide a built-in way to retrieve event handlers dynamically without this manual lookup?
Would appreciate any insights or alternative approaches! 🚀