cqrs
cqrs copied to clipboard
feat: configurable pub sub for different transports
PR Checklist
Please check if your PR fulfills the following requirements:
- [ ] The commit message follows our guidelines: https://github.com/nestjs/nest/blob/master/CONTRIBUTING.md
- [ ] Tests for the changes have been added (for bug fixes / features)
- [ ] Docs have been added / updated (for bug fixes / features)
PR Type
What kind of change does this PR introduce?
[ ] Bugfix
[X] Feature
[ ] Code style update (formatting, local variables)
[ ] Refactoring (no functional changes, no api changes)
[ ] Build related changes
[ ] CI related changes
[X] Discussion
Mini dictionary
"a Message" - an Event, Query or Command
What is the current behavior?
Even though we are building a microservice ( let's say Kafka ), all of our Messages sent using EventBus, QueryBus or CommandBus are only propagated locally and none of them will end up in Kafka or other Pub-Sub.
In addition, if we'd like to listen to Messages we can do that using @EventPattern() but there is no way of binding received Messages to appropriate handlers.
NestJS is limited to DefaultPubSub classes for Events, Queries and Commands which propagate Messages using ObservableBus.
What is the new behavior?
Introduced a possibility of configuring PubSub with Transport ( Kafka, RabbitMQ etc. ) for Queries, Events and Commands separately and a way to propagate incoming Messages to local handlers. The intention is to write PubSub classes for each of existing Transports in @nestjs/microservice . For now I've done only Kafka pub-subs, but finishing the rest is just a manner of days.
How it works?
We used to import CqrsModule statically:
@Module({
imports: [
CqrsModule,
]
})
After the change we import the module dynamically:
import ( CqrsModule, KafkaCommandsPubSub } from '@nestjs/cqrs';
@Module({
imports: [
CqrsModule.forRoot({
commands: { // can be filled or empty, fallback is DefaultCommandsPubSub without client
pubSub: KafkaCommandsPubSub, // or RabbitMQCommandsPubSub, NatsCommandsPubSub
clientProvider: <provider here> // provider for kafka client ( useFactory, useExisting, useValue, useClass )
},
}),
]
})
Thanks to it, when we use EventBus, QueryBus or CommandBus all the Messages will automatically be sent to/received from Kafka or other Pub-Sub as configured.
In the same way we can configure "events" and "queries". Of course we can also pass nothing, then the bahaviour of the package won't change at all and will fallback to DefaultPubSub.
Introducing PropagationService
Furthermore, binding receiving messages to CommandHandlers, QueryHandlers and EventHandlers is not possible with the current implementation. Let's take a look at this code:
@EventPattern('users')
async handle(message: any): Promise<void> {
console.log(message);
}
Above code is going to log us a complex response from the PubSub server with lots of useless information. Of course it contains data sent from the separate service, but as stated above - without a possibility to fulfill this request by dispatching the request with EventBus, QueryBus or CommandBus ( data is a plain object and bus expects an instantiated dto ).
This is the reason behind creating PropagationService, which automatically detects whether the message was an Event, Query or Command, and propagates it to appropriate local handler by ObservableBus.
Service 1
@Controller()
class UsersController {
// constructor...
@Get('users')
async getUsers(): Promise<User[]> {
return this.queryBus.publish<GetUsersQuery, User[]>('users', new GetUsersQuery());
}
}
Service 2
import { PropagationService } from '@nestjs/cqrs';
class UsersTopic {
// constructor
@EventPattern('users')
async handle(message: any): Promise<any> {
return this.propagationService.propagate(message.value);
}
}
The above example will result in the fact that after entering the example.com/users url we will receive a list of users from Service 1. Under the hood it's going to dispatch the handler @QueryHandler(GetUsersQuery) . The use of PropagationService is optional and can be skipped if case anybody would want to handle incoming message himself.
Of course, the Query DTO itself must be defined separately in Service 1 and Service 2, as they are completely independent of each other.
Does this PR introduce a breaking change?
[X] Yes
[ ] No
Implementations of the EventBus.publish(), QueryBus.execute() and CommandBus.execute() methods have changed and now require the "pattern" parameter so that the Pub-Sub class knows where to send the event. If the pattern is empty, the event will be redirected to ObservableBus and propagated locally.
const pattern: string = 'users';
this.eventBus.publish(pattern, new UserHasRegistered());
To still be able to publish events locally, the publishLocally() and executeLocally() methods have been added accordingly.
Other information
I've worked on this solution as a part of my hobby project. At some point I thought this can be a great extension of CQRS package. I'd really like to know your thoughts - is this the road you want this package to follow. Let what do you think of it, I'm happy to continue working on this PR. It's stable currently and for now supports only Kafka. Later this week I'll try to prepare a repo with a working example on docker-compose with 2 independent services so you could test it yourself.
IN PROGRESS NOTE
I'm actually out of time currently so finishing this can take quite longer than I expected. If anybody is interested in helping with this PR please contact me. I'd really use some help ;)
This looks like a good approach, and my team is interested in contributing to this PR. It would be great to get some feedback from the maintainers (@kamilmysliwiec or others?) as to the direction this PR is heading before we sink resources into completing it.
Personally, I would rather go in the direction of not specifying a pattern when publishing, but rather tying the pattern to the DTO (perhaps using a metadata decorator, or a class property required in the the interface) so that the DTO could be defined separately and shared with both sides of the transaction for increased type safety and reduced reliance on strings.
Really cool approach, @aborodziuk! I have only one suggestion, that I can contribute if you want: would be cool if the option commands
on .forRoot()
method receive an array instead of an object, this way you can attach multiple pub/subs, based on the command itself. Example:
import ( CqrsModule, KafkaCommandsPubSub } from '@nestjs/cqrs';
import { MyCommand } from './command';
@Module({
imports: [
CqrsModule.forRoot({
commands: [
{
pubSub: KafkaCommandsPubSub,
clientProvider: <provider here>,
command: MyCommand,
},
],
}),
]
})
On this example, all commands will use DefaultCommandsPubSub
, except MyCommand
that would use Kafka.
@jmcdo29 This was the idea we talked about, not exactly like this because i think it would make more sense having an adaptor for the different buses which would provide more control over the flow of the action but this repo has some nice ideas regarding it, would be nice to see this going forward or at least some more "brainstorm"
Hey All, I was looking for a solution exactly like this. Using CqrsModule connected to RabbitMQ.
Unfortunately, I don't have the skills/ability to contribute on this topic so I'd like to kindly ask if you guys are planning to keep working on this topic - :) - or know an alternative for it, especially for Queries and Commands.
@aborodziuk Hi, what is the status of this?...
@kamilmysliwiec any feedback?
FWIW - I've moved entirely off the CQRS implementations from Nest, mostly because there doesn't seem to be any maintenance in this area, and the microservices approach is too "magic string" heavy for my tastes, so we built our own type-safe solution here: https://github.com/so-cheep/microservices. There are pre-built Nest adapters for it, and it's currently in production on a small/medium sized real-time application with a RabbitMQ transport. We also have some prototype work done on a NATS transport in a private project which we're hoping to open-source soon (as time allows)