watermill icon indicating copy to clipboard operation
watermill copied to clipboard

Experimental MessageBus component

Open m110 opened this issue 2 years ago • 0 comments

Context

Apart from low-level Publisher and Subscriber interfaces, Watermill users can use either Router or the CQRS component.

After considering how we were using Watermill over the years and talking to other people in the community, it seems to me that:

  • Router is still a low-level API compared to the CQRS component.
    • It makes users responsible for marshaling and unmarshaling, often repetitive.
    • It's a nice API for routing messages from one Pub/Sub to another with little processing. However, event-driven applications rarely use such semantics (hence the weirdly named NoPublisherHandlerFunc we've added quite late).
  • The CQRS component addresses the issues above but seems to be quite complex for what most people use.
    • Separating commands and events is a rare use case. It seems that commands come most often from synchronous APIs, like HTTP or gRPC handlers. You can use just events or commands if you want, but it makes the APIs a bit weird overall (like CommandOrEvent in multiple places).
    • The Facade API doesn't provide a great experience. It's not modular, has a massive config, and mixes all concepts (publishers and subscribers, commands and queries). It makes you provide a publisher even if you don't need one.
    • In theory, the EventHandlers constructor makes it possible to pass dependencies to the handlers. But when using the component, we've been injecting dependencies on another level anyway, not using it.

I feel we could use a high-level API that's more pleasant to use in event-driven services than the Router and is easier to grasp than the CQRS Facade.

One example: it's not trivial right now to run handlers within consumer groups, even though it's a super popular use case. (Two different handlers within the same service consuming the same event type.)

Proposal

  • Add a MessageBus component (name to be discussed).
  • It works with typed messages (structs) and doesn't differentiate between commands and events.
  • Leverage generics to get rid of the weird NewEvent methods.
  • Keep the API closer to that of Router. AddHandler method over creating structs.
  • Decouple Publisher as a separate struct.
  • Replace gogo with golang/protobuf in the default Protobuf marshaler.
  • Add context.Context to Marshal to let users extend metadata with things like correlation ID.
  • Replace "event name" with "message type".
  • Changed default message type to struct name instead of full qualified struct name.

API Proposals

Option 1

Note:

  • We need bus.NewMessageHandler because methods can't be generic.
  • bus.NewMessageHandler is generic but the type is inferred from the handler func passed as the argument.
messageBus, err := bus.NewMessageBus(router, generateTopic, subscriberConstructor, watermill.NopLogger{})

err = messageBus.AddHandler(bus.NewMessageHandler(
	"testHandler",
	func(ctx context.Context, event ExampleEvent) error {
		fmt.Println("Handling", event.ID)
		return nil
	},
	marshaler,
	watermill.NopLogger{},
))

Option 2

messageBus, err := bus.NewMessageBus(router, bus.JSONMarshaler{}, generateTopic, subscriberConstructor, watermill.NopLogger{})

err = bus.AddHandler(
	messageBus,
	"testHandler",
	func(ctx context.Context, event ExampleEvent) error {
		fmt.Println("Handling", event.ID)
		return nil
	},
)

Option 3

Decouple consumer groups handling from generic handlers.

err = router.AddConsumerGroupNoPublisherHandler(
	"testHandler",
	subscriberConstructor,
	message.NewGenericHandler(func(ctx context.Context, event ExampleEvent) error {
		fmt.Println("Handling", event.ID)
		return nil
	}, marshaler, logger),
)

m110 avatar Apr 18 '22 16:04 m110