watermill
watermill copied to clipboard
CQRS with gochannel pubsub?
Hey, First of all thank you very much for this project.
Now I am trying to replicate cqrs example https://github.com/ThreeDotsLabs/watermill/blob/master/_examples/basic/5-cqrs-protobuf/main.go but with gochannel pubsub instead of amqp (rabbit mq). I am hitting a wall trying to figure out what to do, any help would be much appreciated main.go is available here (https://github.com/message-board/identity-go/blob/commands-using-watermill/main.go).
Scenario:
- Client application calls endpoint to create user
- Endpoint sends a command CreateUser (using InProcess as commands would be private to the service)
- CreateUserHandler call Application layer to validate business logic & persist user (TODO)
- CreateUserHandler publishes UserCreated event - these should be global as other services should be able to subscribe to those and eventually I would like to be able to publish it to amqp or any other out of process message queue but for now I want to just publish to another gochannel pubsub.
Thanks in advance
Hey. Replacing the publisher and subscriber with GoChannel should be enough, as you did. Does it work for you, or are there any issues with it?
Hey,
Thanks for looking into this.
I tried to do as you suggested replace subscriber with GoChannel but I am getting compiler errors at Line 78 setting CommandsSubscriberConstructor and Line 92 setting EventsSubscriberConstructor (being a Go newbie doesn't help either :() in main.go file.
Hey try to replace the main function with that:
func main() {
logger := watermill.NewStdLogger(false, false)
cqrsMarshaler := cqrs.ProtobufMarshaler{}
// Since we are using the go channel implementation we could remove commandsPublisher, commandsSubscriber and eventsPublisher, to be simple.
// And then we need to replace the commandsPublisher, commandsSubscriber and eventsPublisher with the channel
ch := gochannel.NewGoChannel(gochannel.Config{Persistent: true}, logger)
// CQRS is built on messages router. Detailed documentation: https://watermill.io/docs/messages-router/
router, err := message.NewRouter(message.RouterConfig{}, logger)
if err != nil {
panic(err)
}
// Simple middleware which will recover panics from event or command handlers.
// More about router middlewares you can find in the documentation:
// https://watermill.io/docs/messages-router/#middleware
//
// List of available middlewares you can find in message/router/middleware.
router.AddMiddleware(middleware.Recoverer)
// cqrs.Facade is facade for Command and Event buses and processors.
// You can use facade, or create buses and processors manually (you can inspire with cqrs.NewFacade)
cqrsFacade, err := cqrs.NewFacade(cqrs.FacadeConfig{
GenerateCommandsTopic: func(commandName string) string {
return commandName
},
CommandHandlers: func(cb *cqrs.CommandBus, eb *cqrs.EventBus) []cqrs.CommandHandler {
return []cqrs.CommandHandler{
BookRoomHandler{eb},
OrderBeerHandler{eb},
}
},
CommandsPublisher: ch, // <-- Here
CommandsSubscriberConstructor: func(handlerName string) (message.Subscriber, error) {
return ch, nil // <-- Here
},
GenerateEventsTopic: func(eventName string) string {
return "events"
},
EventHandlers: func(cb *cqrs.CommandBus, eb *cqrs.EventBus) []cqrs.EventHandler {
return []cqrs.EventHandler{
OrderBeerOnRoomBooked{cb},
NewBookingsFinancialReport(),
}
},
EventsPublisher: ch, // <-- Here
EventsSubscriberConstructor: func(handlerName string) (message.Subscriber, error) {
return ch, nil // <-- Here
},
Router: router,
CommandEventMarshaler: cqrsMarshaler,
Logger: logger,
})
if err != nil {
panic(err)
}
// publish BookRoom commands every second to simulate incoming traffic
go publishCommands(cqrsFacade.CommandBus())
// processors are based on router, so they will work when router will start
if err := router.Run(context.Background()); err != nil {
panic(err)
}
}
The CommandsSubscriberConstructor
was using the RabbitMQ implementation so you just need to change the return to Go channel implementation, the same case in EventsSubscriberConstructor
.
Very nice, thank you very much @mirusky I will try that