moleculer
moleculer copied to clipboard
Manual event ack
Is your feature request related to a problem? Please describe.
Currently events in Moleculer are always fire-and-forget, which is a relatively unsafe approach (specially when many of the underlying transporters support manual ACKs).
This would be very helpful when you are not only interested in the reception of the event by the consumer, but also the correct consumption.
For example, a user.registered
event can cause the creation of an account for the user, if the account creation succeeds then we ACK the event. If the account creation fails, the event is not ACK'ed and is eventually re-delivered to another subscriber.
Describe the solution you'd like We could add an optional parameter to the event handlers to change the ACK behavior from automatic to manual. If the transporter supports it it would apply it accordingly, and if not it is just a no-op. Additionally, we would need to have an ack() method on the event handler's ctx, so that the subscriber can acknowledge the processing of the event. Again, if the transporter doesn't support manual ACKs, then this method becomes a no-op.
I've seen that in the protocol document there is an in-progress section for an EventACK, which makes me think that this has been considered to some extent.
Describe alternatives you've considered The closest alternative would be to use a task queue for each event type. We handle the event, create a task on a task queue. The obvious cons to that is the need of an additional task queue, the overall duplication of data/processing across systems, and the latency implications. I believe this is such a common scenario that calls for a cleaner solution.
Additional context References from NATS Streaming: https://docs.nats.io/developing-with-nats-streaming/queues https://docs.nats.io/developing-with-nats-streaming/acks
Why not use ctx.call()
in this example (user registration) and get not only confirmation, but also other features of the built-in middleware?
In DDD terms, the User would live in an Auth bounded context, and the Account from my example in another. Communication between bounded contexts is ideally done by means of events to reduce coupling.
As a further example, let's say you have to send an email to welcome the user and also add the user to the company CRM.
If using ctx.call()
you would need to explicitely do so in the User event handler in the Auth bounded context, coupling that bounded context with many others (e.g. Messaging bc, CRM bc, etc.).
In practice the Auth bounded context doesn't need to know what happens after it publishes the event. Instead, it's the responsibility of the subscribers to define when the event was handled. You can follow the default behavior, which is ack on delivery (a very fast/scalable approach that works well for non-critical events), or you can manually trigger the ack after processing the event (not just receiving it).
Not all events are equal, but they are still events, even if the subscriber manages them in a reliable way, because what differentiates an event from a typical reply/response action is the intent of the caller.
For such tasks, I also want to see a built-in task queue in the moleculer protocol and an internal service. Many things have already been implemented for it and are used for calls between services. The only thing missing is threads and logic to work with such requests, including deferred ones. I am considering integrating with https://github.com/breejs/bree which implements queuing logic on streams without using third party databases.
I think that makes sense for a number of cases, but it's orthogonal to how you handle events reliably.
You might have an event that triggers a task, but if the task scheduling fails you'll be out of luck. That event would be lost, and your business process left in an incomplete state. This is specially important in event-based architectures where the state of the system are the events flowing through it.
Likewise, I tend to think in terms of task queues when dealing with scheduling (i.e. I want this done at X time) or when dealing with parallel processing (e.g. I want this [x1, x2, x3] done at the same time). But when dealing with a simple event that needs to trigger the execution of just one process right away, and with strong guarantees about it's completion, it sounds like an overkill to throw in a task queue. (although granted, it is possible)
Specially given that most of the message-based transports offer similar capabilities (e.g. NATS Streaming, Rabbitmq, AMQP, Kafka). And in the worst case scenario, if you do attempt to manually trigger an ACK on a transport that doesn't support that, it has no effect whatsoever.
For all other scenarios, I'm 100% in agreement with you @intech . In fact for those I tend to use Argo Workflows
I believe event acknowledgement is a messaging infrastructure concern and only necessary to indicate to the infrastructure implementation that a consumer of an event did receive the event upon publishing while active. In this case, Moleculer is the abstraction around the underlying infrastructure and it does indeed ensure that published events are received by an available broker and where possible acknowledged.
In terms of DDD: In chapter 8 of Implementing DDD (Vaughn Vernon 2013), Vernon discusses Domain Events in a section called "Spreading the News to Remote Bounded Contexts". In this section he dives into "Messaging Infrastructure Consistency" and describes three basic ways to accomplish this.
- Domain model and messaging infrastructure share the same persistence store.
- Domain model persistence store and messaging persistence are controlled under a global XA transaction.
- Events are stored in a special storage area specifically for events. Event storage should belong to the bounded context and not to the messaging infrastructure. This is to say that is it your implementation's responsibility to maintain and provide historical access to events published by the context and not the messaging infrastructure's.
I believe event acknowledgment by the context within the Moleculer service is somewhat (though arguably not entirely) in line with option two which is essentially a "global two-phase commit" approach. Vernon of course includes this option so it isn't invalid but he does point out the disadvantages of this approach, namely that global transactions tend to be expensive and perform poorly, and concludes by stating that the third approach is used throughout the examples in the book.
I personally agree with Vernon here as I think it is the publishing context's responsibility to know and make known the events which have been published. This supports a firm decoupling from consumers which are and always should be unknown the the publishing context. Consumers are then able to review published events and account for any which may have been missed in addition to receiving events asynchronously as they are published.
Task queues, which have also been mentioned here, arguably do not solve this problem because again it relies on downstream consistency. In other words, the event has to make it to the downstream service/queue/context reliably in order fo the solution to work. The third option provided by Vernon - local context event persistence - inverts the responsibility making publishers responsible for the consistency of events leaving that context in a way that is entirely decoupled from downstream contexts. Events, for example, may be retrieved from upstream contexts over simple REST interfaces. (In the Moleculer world, upstream action calls.)
Ultimately, my opinion is that support for event acknowledgment within Moleculer, if determined to be a desirable feature, should only be included as an optional feature to support those who desire to go down this path (as indicated by the OP). I could see use cases and architectures where this may be desired (especially for simplicity or prototyping) but generally I believe that consistency within the system is the responsibility of each context and falls outside of the responsibility of the Moleculer framework. Essentially, I would prefer to design my contexts such that they are aware of and attempt to resolve eventual-consistency issues which may have occurred due to lack of availability. This way, my downstream context - which knows which events from upstreams contexts in my context map it is concerned with - can review review stored events from upstream contexts and essentially "synchronize".
Side note: This is not to say that a Moleucler module which supports a type of context-local event persistence and publishing is not warranted as this may be very helpful by providing a DRY approach such that each context does not need to reinvent the wheel.
Thanks @campanalecp for such a thoughtful reply! I agree with your point of view and the approaches from the red book.
My point is that while already using a message queue such as the one provided by NATS Streaming, that message queue on itself can be considered your event store. Usually, communication between bounded contexts is done by means of integration events or domain events projections, with the objective of avoiding leaking internal abstractions and language to the outside world of your bounded context). Those event projections can be considered as a view or read model. That read model can be stored in an upstream data store (responsibility of the publisher to ensure reliable storage), and exposed through an OHL (API). Alternatively, it can be be stored downstream (responsibility of the consumer), and that's usually the recommended approach if you have ownership of the downstream bc). Finally, there's a 3rd point of view in which the event stream itself is the read model. This is specially the case when dealing with event sourcing.
When you have direct access to the messaging / event store infrastructure, a common solution is to modify the current position from the consumer in response to the successful processing or failed processing of an event.
E.g. you consume an event from the event store, the event store increments the consumer current position in that stream, and the consumer persists the last successful position to a data store. If the operation associated with processing the event fails, either the consumer finds out after receiving the next event, or another process continuously checks for failed events. Both scenarios result in the current position from the above consumer being reset to the previous successful position, and the failed event being replayed.
That works, but does require additional plumbing and direct access to the underlying messaging infrastructure.
The solution I proposed fulfills the same objective, with an even simpler approach by reusing what we already have in most message queues, without the need to expose the messaging infrastructure, and with sane defaults in case manual ACKs are not available (it becomes a NOOP and the responsibility goes back to the developer).
I personally don't consider it wrong for Moleculer to expose (abstracted) more of the standard functionality found in message queues. e.g. ACK handling, maybe even setting the starting point of consumption, or the ability to move the cursor to another sequence in the stream. If the capability is supported by the underlying transport, we provide a simple abstraction and let you use it. Otherwise, we can throw an error at the event handler configuration time, and noop the operations.
the event acking is an old & eternal story. Every year it appears and disappears without exact ideas. The main problem is that everybody has a concept in his mind which can solve only his problem but mostly these concepts are not universal and not good for others.
What @juanchristensen proposed that using NATS streaming ACK is the same idea. It can work only with NATS Streaming and only with disableBalancer: true
. If you don't disable it, NATS always will try to send the packet to the same node (which maybe crashed).
So a higher-level solution can be a built-in Moleculer event ack solution which is independent of transporter & balancer state. Here is my RFC about it: https://github.com/moleculerjs/rfcs/issues/2 (By the way, nobody responses to help to solve the issues/questions) As you see the "Unresolved questions" it's not a trivial thing to handle the acknowledgements. And I don't know what is a good solution which can be good for the vast majority.
On the other hand, if you add ack-handling logic into the producer side, it won't be loosely-coupled because you should know which ack/nack is important and whether needs to resend it or not.
This puzzlement was why I extended the middleware feature in the Moleculer to support all main methods hooking. E.g. if you want to store the emitted events into a store/DB, you just define emit
& broadcast
hooks in the middleware, implement the storing logic and call the next
(which will execute the original built-in logic).
@juanchristensen what you described about consumer positions, replaying...etc it is what Kafka does very well. It's perfect for Event sourcing and you can use it. I know some companies who develop projects with Moleculer but they are using Kafka to transfer important events instead of Moleculer event solution. And it works properly. They didn't try to force using only Moleculer for all things.
I mean if there is a problem and there is a tool which perfectly covers it, then use it (alongside Moleculer). We shouldn't try to implement Kafka functionality into the Moleculer because it never will be as good as that.
I agree @icebob , I wasn't suggesting that we implement Kafka-like functionality into Moleculer, just making a point that most messaging transports provide their idiomatic tools to handle reliable delivery and processing of messages. In NATS Streaming that is the capability to do manual ACKs. In Kafka it's the capability to keep track of the consumer offset within one consumer group. I'm pretty sure there's a similar ACK behavior in AMQP/RabbitMQ.
I missed the RFC (but did notice the ACK part on the spec), I'll take a deeper look and try to propose solutions on that thread, thanks for pointing it out.
I think there are at least two ways to approach this within Moleculer: 1- Creating an extension to the Moleculer protocol to handle ACKs, regardless of the underlying transport that's being used. 2- Extending the transporter interface so that ACK handling can be implemented in an idiomatic way per each transporter, using the building blocks from the transporter itself.
What inclines me to the later, is the fact that many of the transporters supported have widely different guarantees and capabilities. If you reduce all the transporters to just a messaging infrastructure (because that's the common denominator), then you lose a lot of the individual benefits, and are forced to implement those capabilities within Moleculer (aka the ACK rfc). Or stop using that part of Moleculer in favor of directly using the transporter as mentioned.
If you need reliability and recoverability in the way events are handled in your system, then you should use a transport that gives you those guarantees. In my opinion, the better that Moleculer is able to capture the developer intent and requirements, the better it can help guide those decisions. e.g. if a developer tries to manually ACK an event while using a tranporter that doesn't have that capability, then could throw an error telling him to switch to a reliable transport.
Otherwise it's like using UDP and building on the application side everything needed for reliability, instead of just using TCP. (reminds me of SCTP).
The protocol v4 (which using in Moleculer 0.14) already supports event ACK, just it's not used yet: https://github.com/moleculer-framework/protocol/blob/master/4.0/PROTOCOL.md#eventack
Thank you, @juanchristensen for the kind response; really appreciate it!
Your explanation was very enlightening. I see how what you are proposing could be used to help provide more durable events and still follow Vernon's third point/practice. By providing a simple acknowledgement system around messaging you can empower the upstream context to be in charge of the local context's events themselves and the history of those events (in lieu of an event sourced system where read models are sourced from the stream itself) with the option of also tracking where any of that context's consumers may be in the stream. Tracking would, of course, be generic in this case to avoid decoupling and simply provides a means to implement an API for downstream contexts to "pick up where they left off".
I also I agree that it would makes sense to support additional configurability over the messaging abstraction in the framework. While it may not be the responsibility of the framework to (opinionatedly) solve this problem for the developer, providing some features like this would make the problem much more approachable.
@icebob I also had no idea about the RFC repository. While I wish I could find the time to be more involved in this community, I will definitely take a look though now that I know it's there.