watermill icon indicating copy to clipboard operation
watermill copied to clipboard

CQRS with gochannel pubsub?

Open kashifsoofi opened this issue 2 years ago • 4 comments

Hey, First of all thank you very much for this project.

Now I am trying to replicate cqrs example https://github.com/ThreeDotsLabs/watermill/blob/master/_examples/basic/5-cqrs-protobuf/main.go but with gochannel pubsub instead of amqp (rabbit mq). I am hitting a wall trying to figure out what to do, any help would be much appreciated main.go is available here (https://github.com/message-board/identity-go/blob/commands-using-watermill/main.go).

Scenario:

  • Client application calls endpoint to create user
  • Endpoint sends a command CreateUser (using InProcess as commands would be private to the service)
  • CreateUserHandler call Application layer to validate business logic & persist user (TODO)
  • CreateUserHandler publishes UserCreated event - these should be global as other services should be able to subscribe to those and eventually I would like to be able to publish it to amqp or any other out of process message queue but for now I want to just publish to another gochannel pubsub.

Thanks in advance

kashifsoofi avatar Aug 13 '21 21:08 kashifsoofi

Hey. Replacing the publisher and subscriber with GoChannel should be enough, as you did. Does it work for you, or are there any issues with it?

m110 avatar Aug 17 '21 20:08 m110

Hey,

Thanks for looking into this.

I tried to do as you suggested replace subscriber with GoChannel but I am getting compiler errors at Line 78 setting CommandsSubscriberConstructor and Line 92 setting EventsSubscriberConstructor (being a Go newbie doesn't help either :() in main.go file.

kashifsoofi avatar Aug 18 '21 09:08 kashifsoofi

Hey try to replace the main function with that:


func main() {
	logger := watermill.NewStdLogger(false, false)
	cqrsMarshaler := cqrs.ProtobufMarshaler{}

	// Since we are using the go channel implementation we could remove commandsPublisher, commandsSubscriber and  eventsPublisher, to be simple.
	// And then we need to replace the commandsPublisher, commandsSubscriber and  eventsPublisher with the channel 
	ch := gochannel.NewGoChannel(gochannel.Config{Persistent: true}, logger) 

	// CQRS is built on messages router. Detailed documentation: https://watermill.io/docs/messages-router/
	router, err := message.NewRouter(message.RouterConfig{}, logger)
	if err != nil {
		panic(err)
	}

	// Simple middleware which will recover panics from event or command handlers.
	// More about router middlewares you can find in the documentation:
	// https://watermill.io/docs/messages-router/#middleware
	//
	// List of available middlewares you can find in message/router/middleware.
	router.AddMiddleware(middleware.Recoverer)

	// cqrs.Facade is facade for Command and Event buses and processors.
	// You can use facade, or create buses and processors manually (you can inspire with cqrs.NewFacade)
	cqrsFacade, err := cqrs.NewFacade(cqrs.FacadeConfig{
		GenerateCommandsTopic: func(commandName string) string {
			return commandName
		},
		CommandHandlers: func(cb *cqrs.CommandBus, eb *cqrs.EventBus) []cqrs.CommandHandler {
			return []cqrs.CommandHandler{
				BookRoomHandler{eb},
				OrderBeerHandler{eb},
			}
		},
		CommandsPublisher: ch, // <-- Here
		CommandsSubscriberConstructor: func(handlerName string) (message.Subscriber, error) {
			return ch, nil // <-- Here  
		},
		GenerateEventsTopic: func(eventName string) string {
			return "events"
		},
		EventHandlers: func(cb *cqrs.CommandBus, eb *cqrs.EventBus) []cqrs.EventHandler {
			return []cqrs.EventHandler{
				OrderBeerOnRoomBooked{cb},
				NewBookingsFinancialReport(),
			}
		},
		EventsPublisher: ch, // <-- Here
		EventsSubscriberConstructor: func(handlerName string) (message.Subscriber, error) {
			return ch, nil // <-- Here
		},
		Router:                router,
		CommandEventMarshaler: cqrsMarshaler,
		Logger:                logger,
	})
	if err != nil {
		panic(err)
	}

	// publish BookRoom commands every second to simulate incoming traffic
	go publishCommands(cqrsFacade.CommandBus())

	// processors are based on router, so they will work when router will start
	if err := router.Run(context.Background()); err != nil {
		panic(err)
	}
}

The CommandsSubscriberConstructor was using the RabbitMQ implementation so you just need to change the return to Go channel implementation, the same case in EventsSubscriberConstructor.

mirusky avatar Aug 30 '21 05:08 mirusky

Very nice, thank you very much @mirusky I will try that

kashifsoofi avatar Aug 30 '21 16:08 kashifsoofi