[Host] Consumer pipeline revamp
When per message scoping is enabled and an exception is thrown in the messaging pipeline; if a custom imeplementation of IConsumerErrorHandler is available, both the instnace of IConsumerErrorHandler and the retry function supplied to IConsumerErrorHandler.OnHandleErorr() are created under the original scope. This limits the functionality of the retry mechanism when transactions/scoped dependencies are in use.
An use case is when using a scope registered DbContext that experiences a transient error while in a transaction. Any subsequent retry will reuse the existing DbContent connection which is now in a failed state.
It would be ideal to only create the message scope when retry()/DoHandleInternal() is called, but the consumerInvoker instance that is made available to both IConsumerErrorHandler and IConsumerContext is dependent on the scope.
IConsumerContext is also dependent on a scoped instance of IMessasgeBus (MessageBusProxy).
Are you planning a PR improve this? If this something that is a priority for your project?
To be honest, there's not a lot that I can do to improve it without introducing breaking changes.
The basic flow at the moment is:
- create a
IMessageScope - create
MessageBusProxyusing scope - create
consumerInstanceusing scope - create
IConsumerContextusing scope withMesageBusProxy+consumerInstance - call
DoHandleInternal(), passing the scopedIConsumerContextandMessageBusProxy - on exception: call
DoHandleError()passing int the sameMessageBusProxy,IConsumerContextandIMessageScope - Check for an isntance of
IConsumerErrorHandlerand recreate the steps inDoHandleInternal()to be co-ordinated by theIConsumerErrorHandlerinstance.
Due to this implementation being wrapped by a single IMessageScope, the IConsumerErrorHandler and retry() instances executes under the same scope as the original execution.
Possible solutions:
- Run through steps 1 to 5 as above and on an error check if there is an instance of
IConsumerErrorHandleravailable. If so, create a new scope and executeOnHandleError()on the instance.IConsumerErrorHandlerallows for multiple retry executions but due to the wrapped scope, the exception will just be deferred to the second exexcution. - Remove the references to
MessageBusProxyandConsumerfromIConsumerContextand modify downstream to get the instances elsewhere. Move message scoping to happen inDoHandleInternal()/retry(). Scope is recreated as needed, but contracts are broken and contextual information may be missing. - Do not pass an instance of
IConsumerContexttoIConsumerErrorHandler. Move message scoping toDoHandleInternal()/retry(). Allows for multiple retry attempts as in 3; but breaking change onIConsumerErrorHandlerwith loss of context. - Extract the
retry()functionality fromIConsumerErrorHandlerand have the handler return a state indicating that a retry is required** (the handler could then implement its own jitter/back-off implementation. Breaking change onIConsumerErrorHandler. - Reimagine the pipeline where Resillience/Exception handling/Scopes/Request Response are just middleware. Allow for re-execution of child middleware. This is definitely the most extensible pattern, but requires a big refactor and has lots of breaking changes.
None of these options are ideal unfortunately. Do you have any other ideas/suggestions?
** it would be nice to optionally have an "abandon" state too, where a message can be sent directly to the DLQ instead of being served up by the service bus again (for non-transient exceptions).
When the interceptors where introduced they were running intentionally on the same scope as the message handling scope (I wanted the interceptors to have access to the ongoing/current message scope).
Later, when Error Handler interceptors were introduced I had the same thought if not to create a whole new scope and debated this some at the time, and came to the conclusion these consumer error handlers are like a try-catch wrapper with the option of simply repeating the try-catch block. So a retry would re-use the existing scope, and it felt that the created message scope dependencies could be re-used (in most cases) and its more efficient to re-use them. Therefore I had not went down the road to re-create the message scope upon retry(). Of course this could be an additional option, but first need debate about this and weigh the prons/cons and effort.
Now, in your case I do understand the current design might not work (DbContext), and you really want to re-run the whole message processing (from the point of creating the scope) without letting it count towards retry count on the ASB side, correct?
To solve your case (as another option):
- Option 6) We could let the ASB re-run the message at the transport level. In the ErrorHandler you could get some transport level extensions like we have for RabbitMq to return the message without incrementing the retry counter. Conversely, with this approach, you could have the ability to decide to DLQ the message without further ASB retries. I do understand there would be a delay related with re-scheduling the message on the transport level (not ideal).
- I do like the idea of 5) that you've proposed, but that would require more breaking changes and I wonder about the memory overhead that such a design adds (currently interceptor pipeline runs as an option if you have interceptors for a given consumer).
- The other options mentioned 1-4 that you mention would fall under let's recreate the scope (and dependencies) which perhaps could be done as an option (to recreate the scope or not, with default to not recreate the scope).
Also, I will give this some more thought and review the code as currently I have other things in my focus.
Please let me know.
My primary application is for applying retry policies (with back-off and jitter) on Azure infrastrucure after a transient failure. With this, it's not the incremented delivery count that is an issue, but rather that messages are sent back to the queue when they fail/lock on mass and end up being processed again in unison -locking once more. I could add an interceptor to apply a delay before returning it to the queue but that just doesn't feel quite right.
With regards to 5; I was intrigued, so I threw up a quick spike on implementation and to have a look at what the overhead would look like. The spike has various configurations, but the simple flow is to pass a message into a runner which steps through registered middleware with ad hoc dependency resolution. Each middleware instance passes the context on the next which allows for manipulation as/when required. This makes the implementation pretty flexible in terms of what could be supported (resillience, transactions, scopes, hybrid deserialization, message lifecycle, etc).
ScopedPipeline SImulates a repeated execution with a full rescope..
NoScopePipeline No resilience, no scoping but does include a transaction scope middleware.
OneThousandInterationNoopPipeline 1000 iterations of a no-op interceptor before calling the consumer (stack overhead of many interceptors).
MinPipeline No middleware other than the consumer host.
NoPipeline Same as MinPipeline but does not use a runner. A non-interceptor implementation.
| Method | Mean | Error | StdDev | Median | Allocated |
|---|---|---|---|---|---|
| ScopedPipeline | 181.13 us | 3.620 us | 5.074 us | 181.90 us | 25.95 KB |
| NoScopePipeline | 38.10 us | 2.117 us | 6.174 us | 37.40 us | 13.91 KB |
| OneThousandInterationNoopPipeline | 301.53 us | 5.041 us | 5.394 us | 301.30 us | 114.41 KB |
| MinPipeline | 29.03 us | 1.284 us | 3.726 us | 27.90 us | 5.91 KB |
| NoPipeline | 27.36 us | 1.588 us | 4.633 us | 26.90 us | 4 KB |
The samples are very rudamentory, and certainly would need refinement and bus integration, but do flesh out the idea. Of course, due to the breaking nature of the change, it may very well be too much too chew. The major causalty would be IConsumerErrorHandler but interceptors in general would be impacted if the full approach were to be taken (context manipulation).
@EtherZa could you update the spike link provided? Its currently broken.
I like the middleware approach 5), as an option the middleware could decide if to redo the scope (default = false). From your benchmark, there is overhead. The refined implementation could perhaps be more lean and perfomant. As to the breaking changes, I am on the fence if still this could go into the v3 as I had the intent to release this after some more features (it has been in the making for some time now).
Let me think about your requirements some more and get back to you.
As a developer, I want to have the ability to inject my custom transient error handling logic and decide if to retry the whole message processing pipeline with the option to re-create the message scope.
@zarusz sorry, the repo was marked as private. The link should be good to go now.
It is definitely a fundamental shift and while the additional stack does add overhead, I'll leave it to you to decide if the benefits outweigh it or not. The spike is quick and dirty and definitely could be optimised. The boxing/unboxing is just one area that would need to be made more efficient, but the principal is there.
Given the scope re-create feature is handled as part of #354, I suggest we leave the pipeline revamp exploration to another bigger milestone 4.0.0, once the 3.0.0 is released.