watermill
watermill copied to clipboard
Experimental MessageBus component
Context
Apart from low-level Publisher
and Subscriber
interfaces, Watermill users can use either Router
or the CQRS component.
After considering how we were using Watermill over the years and talking to other people in the community, it seems to me that:
-
Router
is still a low-level API compared to the CQRS component.- It makes users responsible for marshaling and unmarshaling, often repetitive.
- It's a nice API for routing messages from one Pub/Sub to another with little processing. However, event-driven applications rarely use such semantics (hence the weirdly named
NoPublisherHandlerFunc
we've added quite late).
- The
CQRS
component addresses the issues above but seems to be quite complex for what most people use.- Separating commands and events is a rare use case. It seems that commands come most often from synchronous APIs, like HTTP or gRPC handlers. You can use just events or commands if you want, but it makes the APIs a bit weird overall (like
CommandOrEvent
in multiple places). - The Facade API doesn't provide a great experience. It's not modular, has a massive config, and mixes all concepts (publishers and subscribers, commands and queries). It makes you provide a publisher even if you don't need one.
- In theory, the
EventHandlers
constructor makes it possible to pass dependencies to the handlers. But when using the component, we've been injecting dependencies on another level anyway, not using it.
- Separating commands and events is a rare use case. It seems that commands come most often from synchronous APIs, like HTTP or gRPC handlers. You can use just events or commands if you want, but it makes the APIs a bit weird overall (like
I feel we could use a high-level API that's more pleasant to use in event-driven services than the Router and is easier to grasp than the CQRS Facade.
One example: it's not trivial right now to run handlers within consumer groups, even though it's a super popular use case. (Two different handlers within the same service consuming the same event type.)
Proposal
- Add a
MessageBus
component (name to be discussed). - It works with typed messages (structs) and doesn't differentiate between commands and events.
- Leverage generics to get rid of the weird
NewEvent
methods. - Keep the API closer to that of
Router
.AddHandler
method over creating structs. - Decouple
Publisher
as a separate struct. - Replace
gogo
withgolang/protobuf
in the default Protobuf marshaler. - Add
context.Context
toMarshal
to let users extend metadata with things like correlation ID. - Replace "event name" with "message type".
- Changed default message type to struct name instead of full qualified struct name.
API Proposals
Option 1
Note:
- We need
bus.NewMessageHandler
because methods can't be generic. -
bus.NewMessageHandler
is generic but the type is inferred from the handler func passed as the argument.
messageBus, err := bus.NewMessageBus(router, generateTopic, subscriberConstructor, watermill.NopLogger{})
err = messageBus.AddHandler(bus.NewMessageHandler(
"testHandler",
func(ctx context.Context, event ExampleEvent) error {
fmt.Println("Handling", event.ID)
return nil
},
marshaler,
watermill.NopLogger{},
))
Option 2
messageBus, err := bus.NewMessageBus(router, bus.JSONMarshaler{}, generateTopic, subscriberConstructor, watermill.NopLogger{})
err = bus.AddHandler(
messageBus,
"testHandler",
func(ctx context.Context, event ExampleEvent) error {
fmt.Println("Handling", event.ID)
return nil
},
)
Option 3
Decouple consumer groups handling from generic handlers.
err = router.AddConsumerGroupNoPublisherHandler(
"testHandler",
subscriberConstructor,
message.NewGenericHandler(func(ctx context.Context, event ExampleEvent) error {
fmt.Println("Handling", event.ID)
return nil
}, marshaler, logger),
)