ecotone-dev
ecotone-dev copied to clipboard
Proposal: apply `Before`, `Around` and `After` intreceptors on message processors instead of message handlers
Description
This is a WIP proposal to refactor interceptors and message processors.
Before, Around and After intreceptors are all about in process method invocation and not about messaging, while BeforeSend is about messaging, so I am not talking about this last one in this proposal.
The proposal is to apply Before, Around and After to Message Processors instead of Message Handlers. A message processor can be a user space method invocation or an internal Message Processor like all aggregate flow services.
This PR introduces a new interface named RealMessageProcessor which will replace the actual MessageProcessor interface. This interface has only one method: process(Message $message): ?Message
This PR also introduces MessageProcessorActivator which would replace RequestReplyProducer but will only handle messages instead of mixed return types.
This PR includes only a refactoring of the ModellingHandlerModule with the new components. If we agree on the design, we could easily change all modules.
The goal of this proposal are:
- better design : there is actually a lot of piping around interceptors that is hard to follow. I think because they are not about messaging but method invocation
- performance: the execution is more straight forward, I think it could impact performance. It will also positively impact compilation time as there will be less components to compile.
- call stack: the call stack will be a lot simpler
In the end, for example for user space command handler, there would be a single Message Handler registered with a ChainMessageProcessor that will handle all Before, Around, After interceptors and the actual method invocation before returning to the Message handler.
WDYT ?
Maybe PR should only have one reason for change: fixing an issue with the after interceptor?
Interceptors should apply to method calls, not just message handlers (as AOP functionality)
@jlabedo I can't review now, but I can leave a comment to explain reasoning behind different Interceptors.
So what interceptors are about is related to what type of interceptor we want to use. https://docs.ecotone.tech/modelling/extending-messaging-middlewares/interceptors
Before and After Interceptors -
Are meant to hook in into Message flow. For example before does not have access to the invocated method at all, because it does happen before we even get into the invoked object.
This allows for example for filtering out messages, or stopping invocation before even Aggregate is loaded.
Besides that, in this situation Message can be enriched with details (for example we may add userId to the payload, as Command can be an simple array at this point - is not deserialized to Command Class yet)
In general sense we can view them as features optimized for guarding, filtering and enriching.
Presend Interceptors
This is special type of Before Interceptor. The best option to view the use case for that, is having Asynchronous Command Handler. If we would use an Before Interceptor with Async Endpoint, Message would land first in the Message Channel and then after it would be consumed it would trigger Before Interceptor.
This is far from ideal for scenarios where we want to validate if this Message even make sense. So for things like structure validation, or even authentication we want to validate those before they will land in Async Message Channel.
This way we can ensure we don't push Messages to the Queue which make no sense, and only the correct Messages that we know we want to process will go there.
Around Interceptors -
Those are about Method Invocation. This is the moment just before we execute intercepted object, and this is the moment when we actually deserialized the payload to given Command/Query/Event class (Before that Ecotone just carry whatever was given in the payload, it can be json/array for example).
The previous interceptors are not able to do action before method execution and after method exection (as they are not even aware of the executed object).
So if we would do intercept Aggregate, we would get access to the Aggregate itself and Command. It make it really easy to build stuff like, call getUserId and compare it with current executor id, if not in match then this Aggregate does not belong to the executor, so he can't modify it.
This is also perfect fit for intercepting Gateways like Command Bus, as we can for example start transaction before Gateway execution and finish it after, or just rollback on error.
To sum things up: It may be not obvious to reason about all those stuff, because what we are used to, is just Middleware pattern available in other Frameworks, and Interceptors are much more than that. Therefore, I hope above helps to understand that Interceptors are not only about Method Execution. In fact only one Interceptor is about method invocation, the rest of them are actually about the Message flow.
@jlabedo let me know then if this change make sense then, and if we should continue with the PR. If so, then happy to review.
Thanks @dgafka for your explanations.
We agree on Presend interceptors (they relate to messaging) and Around interceptors (they relate to method invocation).
When I think of messaging in my proposal, it is really about channels and the possibility made by ecotone to switch a channel implementation from direct to asynchronous, and that's what makes the power of Ecotone 🚀 :)
Regarding Before and After interceptors, the question is: would you change the channel between a Before interceptor and its endpoint to something else than a DirectChannel ? It is not implemented now, but would the feature be desirable ?
Well, I think It could, but this is already implemented differently with Internal handlers and output channels : you can see them as Before or After interceptors and they relate to messaging as you can easily switch from a synchronous to an asynchronous channel.
But if Before and After interceptors are always meant to be run in the same process than the endpoint, we can make it clear in the design and drop the messaging channel abstraction for Before and After, which would be useless, and I think ecotone code would be easier to understand (and more performant).
Maybe PR should only have one reason for change: fixing an issue with the after interceptor?
@lifinsky , this PR is not meant to fix the issue with after interceptors precedence, just to change the design of interceptors with the exact same specifications for the user. But if we decide to get after interceptors out of around interceptors, I think this new design would make the work a lot easier.
But if Before and After interceptors are always meant to be run in the same process than the endpoint, we can make it clear in the design and drop the messaging channel abstraction for Before and After, which would be useless, and I think ecotone code would be easier to understand (and more performant).
Yes, that's correct they will be only used in synchronous scenarios, before and after are meant to work like this by design. So ye I see your point and it make sense :)
So if we can refactor to simplify it (possible shorten the stack too), yet keep the behaviour they provide, then we can go for that
This PR introduces a new interface named RealMessageProcessor which will replace the actual MessageProcessor interface. This interface has only one method: process(Message $message): ?Message
Why would we replace MessageProcessor?
This PR also introduces MessageProcessorActivator which would replace RequestReplyProducer but will only handle messages instead of mixed return types.
RequestReplyProducer is used as foundation for communication between Endpoints, and if I understood correcty the aim of the PR is to handle synchronous Before/After/Around Interceptors an the execution to not use Messaging.
However other parts will still use Messaging therefore will be in need for functionality which RequestReplyProducer provides.
Can you elaborate why would we replace that and how does it related to the PR?
better design : there is actually a lot of piping around interceptors that is hard to follow. I think because they are not about messaging but method invocation
Agreed, we can think of it like this, due it's synchronous nature.
performance: the execution is more straight forward, I think it could impact performance. It will also positively impact compilation time as there will be less components to compile.
Ok make sense.
call stack: the call stack will be a lot simpler
Sure, maybe it will be easier for Ecotone users to understand what's happening during sync scenarios.
In the end, for example for user space command handler, there would be a single Message Handler registered with a ChainMessageProcessor that will handle all Before, Around, After interceptors and the actual method invocation before returning to the Message handler.
I see, make sense. Thank you sharing your thoughts on that, it gave me higher perspective on how we can look on this. To sum it up in context of interceptors:
- For places where async/sync communication is possible we continue using Message Channels. This way we can easily continue switching the code from async to sync on design level.
- For places where sync communication is the the only possible road, we still continue to use Messages, however we won't be using Message Channels. We will have dedicated Message Processor that will handle intercepting and actual (intercepted) object invocation.
Let me me know if above statements are true or is there something more to add :)
This PR introduces a new interface named RealMessageProcessor which will replace the actual MessageProcessor interface. This interface has only one method: process(Message $message): ?Message
Why would we replace MessageProcessor?
It is made just for the transition, the RealMessageProcessor interface will become MessageProcessor, with only one function taking a Message add returning a Message or null. I may change the name from process to execute, it is clearer that it is not pure and can run side effects.
This PR also introduces MessageProcessorActivator which would replace RequestReplyProducer but will only handle messages instead of mixed return types.
RequestReplyProduceris used as foundation for communication between Endpoints, and if I understood correcty the aim of the PR is to handle synchronous Before/After/Around Interceptors an the execution to not use Messaging. However other parts will still use Messaging therefore will be in need for functionality whichRequestReplyProducerprovides.Can you elaborate why would we replace that and how does it related to the PR?
Same as above, MessageProcessorActivator has the same functionality as RequestReplyProducer but it does not take care of transforming mixed return types to a Message. So this is just for the transition.
I see, make sense. Thank you sharing your thoughts on that, it gave me higher perspective on how we can look on this. To sum it up in context of interceptors:
- For places where async/sync communication is possible we continue using Message Channels. This way we can easily continue switching the code from async to sync on design level.
- For places where sync communication is the the only possible road, we still continue to use Messages, however we won't be using Message Channels. We will have dedicated Message Processor that will handle intercepting and actual (intercepted) object invocation.
Let me me know if above statements are true or is there something more to add :)
This is correct. The actual behavior is a bit tricky, the goal of this PR is to get the exact same userland behavior. Then, with better understanding, we could change it in the future if necessary.
Here is my understanding of how interceptors are hooked in the case of an aggregate command handler:
- before interceptors
- chained handlers (Identifier mapping, load aggregate, call aggregate processor, etc.)
- around interceptors
- intercepted method
- after interceptors
- chained handlers (publish events, etc.)
An existing piece of code that shows interceptors do not fit with message channels 😄 :
private function checkIfInterceptorIsCorrect(MethodInterceptor $methodInterceptor): void
{
if ($methodInterceptor->getMessageHandler()->getEndpointId()) {
throw ConfigurationException::create("Interceptor {$methodInterceptor} should not contain EndpointId");
}
if ($methodInterceptor->getMessageHandler()->getInputMessageChannelName()) {
throw ConfigurationException::create("Interceptor {$methodInterceptor} should not contain input channel. Interceptor is wired by endpoint id");
}
if ($methodInterceptor->getMessageHandler()->getOutputMessageChannelName()) {
throw ConfigurationException::create("Interceptor {$methodInterceptor} should not contain output channel. Interceptor is wired by endpoint id");
}
}
Hello guys, I am having strange behavior in the actual implementation that is hard to maintain with the new implementation. This is the code under test :
// packages/Ecotone/tests/Messaging/Fixture/InterceptedBridge/BridgeExample.php
final class BridgeExample
{
#[ServiceActivator('bridgeExample', outputChannelName: 'bridgeSum')]
public function result(int $result): int
{
return $result;
}
#[ServiceActivator('bridgeSum')]
public function sum(int $amount): int
{
return $amount + 1;
}
#[Around(precedence: 0, pointcut: BridgeExample::class . '::result')]
public function multiply(MethodInvocation $methodInvocation): int
{
return $methodInvocation->proceed()->getPayload() * 3;
}
}
The actual test expects to return 6 when sending 1 to bridgeExample. The flow is:
- Around (multiply)
| ::result(1) = 1
| ::sum(1) + 1 = 2
-> proceed(2) * 3 = 6
Two things to note on this code:
- the output channel is executed inside the around interceptor. Why not, but then around interceptors are related to messaging in a way. This is not obvious to me.
- Looking at the around interceptor code, why would you expect to receive a
Messageas the return value of theproceedinvocation ($methodInvocation->proceed()->getPayload()) ?
Finally, the chain is different if you add a dumb After interceptor:
final class BridgeExample
{
#[ServiceActivator('bridgeExample', outputChannelName: 'bridgeSum')]
public function result(int $result): int
{
return $result;
}
#[ServiceActivator('bridgeSum')]
public function sum(int $amount): int
{
return $amount + 1;
}
#[Around(precedence: 0, pointcut: BridgeExample::class . '::result')]
public function multiply(MethodInvocation $methodInvocation): int
{
return $methodInvocation->proceed()->getPayload() * 3;
}
#[After(pointcut: BridgeExample::class . '::result')]
public function after(int $amount): void
{
$test = $amount;
}
}
Then the result becomes 4 instead of 6. Here is the chain with this new After interceptor:
- Around (multiply):
| result(1) = 1
| after(1) = 1
-> proceed(1) * 3 = 3
- sum(3) + 1 = 4
In this case the output channel is executed outside the around interceptor...
So I propose to keep this last behaviour, meaning:
- First run the ServiceActivator (bridgeExample) with all its interceptors (before, around, after)
- Then send the result to outputChannel
WDYT @dgafka ?
- Around (multiply): 1 * 3
- After: 3
- outputChannelName (bridgeSum: 3 + 1
Hello Guys,
I finally realized that aggregates have another behavior. I added tests in #359 to explain it better. Aggregates have the "correct" behavior in the sense that after interceptors are called outside of around interceptors.
So I propose to keep this behavior for all invocations:
- It is the more natural behavior to me
- It's way simpler to write with the new proposal implementation
This would be a breaking change:
- for code relying on after calls being inside around interception (but why would you do that ?)
- for code relying on the return value of the
proceedmethod : in the case of aggregates it is the aggregate itself (we keep the same logic) but for non aggregates it is aMessage. In the latter case the return value would be changed with the new implementation to the return value of the intercepted method invocation.
It would be nice if we agree with this BC @dgafka : the implementation will be a LOT easier as it is more natural, but I will have to change a lot of unit tests (we may discuss after if we keep them or not, some are testing implementation details).
To recap, the invocation interception would be like this:
- before interceptors
- internal message processors (load aggregate, identifier resolution, ...)
- aggregate method invocation (with eventual around interceptors)
- after interceptors
- internal message processors (save aggregate, publish events, ...)
@jlabedo Important to load and save aggregate inside transactional around interceptor.
@jlabedo Important to load and save aggregate inside transactional around interceptor.
Right, those are handled on gateways interceptors, upper in the chain.
Gateway:
- gateway before interceptors
- around interceptors begin (transaction begins here)
- internal gateway processor sends message to service activator (see below)
- around interceptors end (transaction ends here)
- after interceptors
Service activator processing inside the around interceptors of gateway:
- before interceptors
- internal message processors (load aggregate, identifier resolution, ...)
- aggregate method invocation (with eventual around interceptors)
- after interceptors
- internal message processors (save aggregate, publish events, ...)
So everything is in the transaction boundary
@jlabedo I will have more time to look on this later this week. For now, what I can leave feed for thoughts, because I just remind myself of more advanced scenarios.
In case of Gateways around with naturally wrap everything that is happening inside and all the flows that happens after.
- gateway (start & begin transaction)
- command handler (start)
- event handler (start)
- event handler (end)
- command handler (end)
- gateway (end & commit transaction)
Now this actually have a valid use case to track sub-flows when we around intercept Endpoints (not gateways) too We may trigger an Transaction in Command Handler and expect subflows to be within transaction too
class OrderService
{
#[DbalTransaction]
#[CommandHandler('order.register', 'orderService')]
public function order(string $orderName): void
{
throw new InvalidArgumentException('exception');
}
- command handler (start & begin transcation)
- event handler (start)
- event handler (end)
- command handler (end & end transaction)
The same use case is used for Tracing. Where we hook in for tracing and use context
- command handler (start and begin upper context)
- event handler (start and begin lower context)
- event handler (end and end lower context)
- command handler (end & end upper context)
This allows us to actually create meaningful traces that shows parent-child relations in OpenTelemetry. Correct me if I am wrong, but with this proposal we would lose this, as those wouldn't really be children anymore, but same level actions e.g.
- command handler (start and begin upper context)
- command handler (end & end upper context)
- event handler (start and begin upper context)
- event handler (end and end upper context)
@dgafka , I agree and changed my mind during the night because of what @lifinsky said :)
Just to clarify first, on main, the event handlers are NOT handled inside the command handler around interceptors if you are using the WithEvents feature (see #361 for test case). If you are using the event bus directly in the command handler, yes, those are dispatched immediately inside the command handler, so also in the around interceptor.
Today, the around interceptor is around the CallAggregateService, so you will intercept the call, eventually any event handlers called inside the command handler. Then transaction ends, and then the SaveAggregateService is called (outside the transaction)
So I came to the same conclusion that having around interceptors just around the method call is not really useful. Around interceptors should intercept the whole message processing chain to be useful. So I propose this chain (it could easily be changed):
- Before interceptors
- START around
- internal handlers (load, etc.)
- call aggregate
- (1) After interceptors (so we can add headers before saving for instance)
- internal handlers (save, publish events)
- END around
- (2) After interceptors (so we can be sure the transaction ended)
I hesitate between (1) and (2) for After interceptors. But think I would go for (2) as it seems more natural and it is the actual behavior for aggregates. But we could easily change this behavior, so let me know what you think.
Finally, to answer @dgafka question, we will keep the same calling structure, also for tracing:
- gateway (start & begin transaction)
- command handler (start)
- event handler (start)
- event handler (end)
- command handler (end)
- gateway (end & commit transaction)
Hello guys, tests are passing with minimal changes (in tests 😄). There still need some cleaning, this is still a proof of concept.
Regarding performance, there is a 15% speed boost in HttpStackBenchmark, I expect more when all components are moved to this new implementation. I expect compilation time to be improved, because less components are registered in the container (but we don't have a benchmark for that).
Regarding final behavior, the only difference is gateways and service after interceptors that are called outside of around interceptors (for aggregates, this was the existing behaviour)
I will clean it a bit more to explain how things are working here
I will clean it a bit more to explain how things are working here
Ok, will review after you post your final summary :)
So here are the new abstractions introduced:
MessageProcessor
interface MessageProcessor
{
public function process(Message $message): ?Message;
}
This is the major building block of the messaging system. It can transform a message and/or execute side effects. A lot of existing internal classes did have this signature but now the abstraction is clear.
MethodInvocationProvider
interface MethodInvocationProvider
{
public function getMethodInvocation(Message $message): MethodInvocation;
}
It takes a message and builds a MethodInvocation. This provides an abstraction for retrieving the "object to invoke on" from message (for aggregates)
ResultToMessageConverter
interface ResultToMessageConverter
{
public function convertToMessage(Message $requestMessage, mixed $result): ?Message;
}
This is where we transform the result of a MethodInvocation to a Message. It can change headers, payload or passthrough request message. This is typically injected in a MethodInvocationProcessor.
RequestReplyProducer
It handles executing a MessageProcessor and resolving the channel to send the result message to. The message building logic is now handled inside message processor and the splitting handler is moved to a dedicated class (SplitterHandler).
SplitterHandler
The splitter handler cannot respond to the reply channel defined in the message anymore, this can only be done in RequestReplyProducer. So an output channel is required for the splitter handler. This seems natural, as a splitter should be an intermediary message handler, not the final endpoint.
MessageProcessorActivatorBuilder and ServiceActivatorBuilder
It would have been too much refactoring to change all ServiceActivatorBuilder instances to the new MessageProcessorActivatorBuilder, so ServiceActivatorBuilder is kept and use MessageProcessorActivatorBuilder internally. The incompatibility comes from the changeHeaders and shouldPassThroughMessage properties of ServiceActivatorBuilder that are now handled by the message processor and not the message handler itself.
The two builders have the same purpose of defining a RequestReplyProducer
Interception logic
The interception logic is handled in 4 places:
GatewayProxyBuilderfor all 3 interceptors typesMessageProcessorActivatorBuilderfor after and before interceptorsInterceptedMessageProcessorBuilderinterfaces that can hookAroundinterceptors in their method call definitions.MessagingSystemConfigurationforPresendinterceptors (not moved)
In general very nice design and idea with this refactor. The code is more readable now and feels like allows to work with higher level of abstraction without bothering with lower ones. What I mean by that is when I was refactoring the parts with Invocation, I was not really dealing with Messaging. Therefore it's like working on given level of abstraction without the need of analysis the other (of course to some degree).
Few thoughts on the PR :)
MethodInvocationProviderinterface felt a bit over abstraction here, and created code duplication along the way. I've improved MethodInvoker, so it can be used in those scenarios instead. There is also an LazyMethodInvoker when we do not have yet access to invoked object (aggregates).- MethodInvoker will take care also of binding the interceptors, as we've moved it out of Messaging Config, therefore we now need a single place where this automatically hooked in. This will ensure that will happen for all Message Processors using of the Invoker.
- I've not managed to refactor all places, so some code is still using
MethodInvocationProvider