Rebus
Rebus copied to clipboard
Infinite Retries and 2nd-level retry feature not working reliably as Rebus processing instances increase
I've encountered a problem in running Rebus over 10s of machines related to RetryCount (and FailFast too). Since ErrorTracker is InMemory the count is local to each instance and when running a 10s of processor instances the retry count safeguard is not respected anymore.
In my scenario the processing happens in bursts with hundreds of thousands of events happening 'sparsely' during the day, few times a day. We're using a queue as a buffer to handle the ingestion load and autoscaling the underling infrastructure to meet the 'spike' processing. Quite basic. In this context happened that some external dependency got slower and start failing after a timeout: this affects only a small subset of the messages.
We expected to be shielded from this kind of errors by the Retry feature of Rebus, in particular by using 2nd-level retries: if a message fails for 5 times, defer it by moving it to a separate system implemented in the by handling IFailed<T> messages.
What we actually observed was that these 'slow' messages continued to got processed for many more times than expected, remaining in the queue 'forever', like an infinite retry, and never hitting the IFailed<T> handler.
We tried adding the Exception to FailFast to exit at first retry, but nothing changed. Although it helped pinpointing the issue :)
What happened was:
- Machine01: Exception 1 on MessageId:A
- Machine02: Exception 1 on MessageId:A
- Machine03: Excpetion 1 on MessageId:A
- [...]
- Machine01: Exception 2 on MessageId:A
- Machine02: Exception 2 on MessageId:A
- [...]
- Machine01: Exception 3 on MessageId:A
- [...]
- Machine01: Exception 1 on MessageId:A
- Machine02: Exception 1 on MessageId:A
The message processing takes 2minutes to fail and by having more than 10 instances, it's 'likely' that the per-instance ErrorTracking expires by the time it sees again the same Message for a 2nd time. With FailFast the chance to see a 2nd-level retry raised, but not by much since Azure Service Bus looks to have a sort of round-robin for distributing the Abandoned messages across clients as long queue is not empty.
The issue here is that a mechanism that is built to 'defend' against unhandled/unexpected errors is not actually defending much: messages are retried forever and 2nd-level handling is never hit.
My mitigation has been to add a IHandleMessages decorator in my container that in case of Exception in Handle(T), create a custom IFailed and calls IHandleMessages<IFailed<T>> (quite ugly but works flawlessly so far!).
I'm now looking into fixing this in Rebus, either by using some extension point and/or making a PRs, but I'm not finding a decent angle, in particular to support 2nd-level retries. Some Transports have a concept of 'DeliveryCount' persisted on the broker side (ASB and SQS both have) which I'd like to use for a distributed counting of the Retries. SQL Transport can be extended too, and is particularly easy when Lease is used.
The basic idea is to redesign the 'Retry' feature supporting a Transport-provided optional retry counting and revisiting ErrorTracker to be async so that a distributed tracker can eventually be implemented (not required). The main change is around 2nd-level retries handling: call Handle(IFailed<>) immediately after Handle(T) guaranteeing at least one Exception is part of IFailed<>, fallback to Handle(IFailed<>) on the next Message handling (same as now) but using the Transport-provided count instead of the local count of Exceptions.
The hardest part is to support FailFast behaviour interpreted as to avoid re-handling the original message if a FailFast exception has been captured: handling this immediately after the Message support this as long as IFailed handling don't fail. If it fails, either the ErrorTracker is distributed or we're luck to end in the same processor instance or we process the original message again (same as now). There are quite a lot of interface changes in ErrorTracker, ErrorHandler and Retry (maybe ErrorHandler can be avoided, not sure).
If you approve the concept above, my main blocker is how to properly handle 2 messages in the same TransactionContext: the 'original' and the 'failed'. Any idea?
Pseudo-code follows, any comment is appreciated in particular if you see an approach to implement this initially as an extension, so I can iterate and live-test without building custom versions of Rebus.
Process(...)
{
// CHANGE1: make ErrorTracker interface easily extendable for a distributed implementation.
// - Use Async, expecting I/O for Exception persistency instead of current sync interface
// - Pass in the TransactionContext so that wider decisions can be made in extensions
var knownEx = await _errorTracker.GetMessageExceptions(context);
// use the Transport provided count if any, otherwise the tracker
// TODO: move logic into ErrorContext, here for snippet readability
var retriesSoFar = transportMessage.DeliveryCount ?? knownEx.Count();
var errorContext = new ErrorContext(transactionContext, knownEx, retriesSoFar);
// if we have already at lease one 'error' tracked (InMemory or distributed) and we should handle this as an error. do it.
// otherwise, since we miss errors we can't bailout without a reason, we're going to Handle the message one more time
if (errorContext.Exceptions.Count() > 0 && (errorContext.RetryCount > _settings.MaxRetries || _settings.IsFailFast(errorContext.Exceptions.Last())))
{
await _handleError(incomingContext, errorContext);
}
else
{
try
{
await next();
await _errorTracker.Cleanup(errorContext);
} catch (Exception ex)
{
// this increments RetryCount by one, indipendently by DeliveryCount, and adds the Exception to the ErrorContext list
await _errorTracker.TrackException(errorContext, ex);
if (errorContext.RetryCount > _settings.MaxRetries || _settings.IsFailFast(ex))
await _handleError(incomingContext, errorContext);
}
}
}
async Task _handleError(...)
{
// if SecondLevelRetries are enabled, handle it _immediatly_ not deferred to next.
// ISSUE: this means that in the "same" TransactionContext, 2 messages are Handled. Can this be supported? Which changes are required?
// This is required to ensures that _at least one_ Exception is present in the IFailed.
// If IFailed<> is deferred to next message handling, we lose this guarantee as the other instance may not have a 'known exception' unless errorTracker is Distributed.
// That is particularly impactful for FailFast Exceptions which is a way for the client to request to avoid 'retries' and instead they would if a message is re-processed by another instance.
// By handling the IFailed<> immediatly after as part of the MessageHandling pipeline,
if (_settings.SecondLevelRetriesEnabled && (errorContext.RetryCount < _settings.MaxRetries * 2))
{
var failedMessage = _createFailed(message, errorContext);
// not sure at which level to Fork this and how to manage the TransactionContext ...
// - same Transaction replacing the Message
// - nested Transaction with a CheckpointPoint made before handling the 'success' message, executing only 'onRollback' added _after_ the next() call, discarding 'onCommit' added _after_
// - others?
var nestedPipeline = _forkPipelineForFailure(incomingContext, errorContext);
try
{
await nestedPipeline.Process(); // process the new failedMessage _after_ the original
await _errorTracker.Cleanup(errorContext);
return;
} catch (Exception ex)
{
await _errorTracker.TrackSecondLevelException(errorContext, ex);
if (errorContext.RetryCount < _settings.MaxRetries * 2)
return;
}
}
// we should always 'poison' the original message. The Reason/Description may point to last error in Handle(IFailed<>)
// In case of SecondLevelRetry, we're guaranteed to have both the last known message Exception and the last 2nd-level retry Exception in the ErrorContext
await _errorHandler.HandlePoisonMessage(message, errorContext);
await _errorTracker.Cleanup(errorContext);
}
Thanks for bringing this up! This is a rather complicated issue, which I have always optimistically ignored, and which seems to have been "good enough" for most scenarios.
But I can definitely see how elastic instance management can jab a stick in the wheel for Rebus' built-in in-mem error tracking, so it would definitely be nice to address this somehow.
Since what you describe sounds rather finicky (clever, but finicky 😅) – and I would be a little bit afraid that it could interfere with other transports – I'd like you to consider another idea! 🙂
How about (from Rebus 7) we make IErrorTracker
async for some of its methods – namely those that register an exception and retrieve the exception details
This would allow for the Azure Service Bus transport to support a "distributed mode" where it registers its own implementation of IErrorTracker
. This implementation would would require access to a storage account with a blob container, and it would differ from the current in-mem implementation by saving caught exceptions as lines appended to a append-only blob named after the input queue/message ID. It would then, obviously, check the ASB message's delivery count in the HasFailedTooManyTimes
method.
It would be an opt-in thing, requiring additional configuration that could look somewhat like this:
services.AddRebus(
configure => configure
.Transport(t => t.UseAzureServiceBus(...)
.EnableDistributedErrorTracking(storageAccount, "rebus_errors"))
.(...)
);
Oh yeah and blobs are actually a super-neat way to collect this type of information I think, as it's super cheap storage and can be configured to automatically delete blobs that haven't been accessed for a month or so (via Azure's built-in blob lifecycle management settings).
Yeah, most of the time applications don't have such scaling: I use Rebus since few years (thanks!) and never 'found' this limitation :)
I do agree that supporting a DistributedErrorTracker is better a better overall design, and I absolutely agree that my code is finicky (and thanks for the choice of a kind word :) ).
The proposed solution has a few caveat imho:
- The responsibilities between the Tracking and the Policy evaluation should be split too in v7.
HasFailedTooManyTimes
could be moved toIRetryStrategy
removing the dependency from SimpleRetrySettings of the tracker implementation. The TrackException can have a flagfinal:bool
to flag that for the given MessageId that's done, but the decision is made elsewhere. - Tracker should be used in way that the 'ErrorState' is accessed once per message, keep it in the message context until the current transaction is completed. The current interface is hard to use in such a way as access is considered cheap and is done in multiple places and only the MessageId is passed as parameter (which would be fine for storage only). A Context-aware pattern is needed to ensure performances. My fear here is that 'error tracking' may become a performance bottleneck and overshadow the message handling cost if access pattern is not revised too.
- EnableDistributedErrorTracking is not tied to ASB or the Transport. Any Transport in Rebus would 'benefit' from it in a high-scale scenario. I see it more as an option near the Retry strategy
o.SimpleRetryStrategy().WithDistributedErrorTracking().UseAzureStorage(cs)
. S3, Sql, Redis and other technologies can be used for Tracking indipendently from the Transport. I see it similar to 'DataBus' and maybe those implementations for the 'offload' of a data-blob can be reused/extended to be used for the DistributedErrorTracking.
I'd still like you to consider a mid-way solution as improvement over the current 'default' implementation where the RetryStrategy can be made more reliable even without a Distributed error tracking. Not perfect, but better imho. DistributedErrorTracking can be added on top and make it 'perfect' if that is needed. The full 'list' of the past-Exceptions is a nice-to-have at least for me: most of the time the last one is often informative enough as all other times the error is likely the same and in any case that knowledge is covered by Application logs. And in any case, even now, is not the 'full list' as are only those the current instance has seen.
There are Transports that provides a DeliveryCount that can be used without Distributed tracking to have a more stable result at the expense of the full list of Exceptions in the IFailed<> or in the PoisonMessage's ErrorDescription, guaranteeing the last exception is present. This would require the PoisonMessage handling to happen after the Handle() not at the next Receive() of the same message once the threshold has been reached.
That aside, I think that if in v7 IErrorTracker is made async-friendly and its interface is revisited to split Tracking and Policy decisions, possibly moving them to IRetryStrategy, I think that would solve this issue and offer extensions points to brew different strategies if needed.
@mookid8000 I've hacked the 'finicky' :) version here and is behaving fine in test envs so far.
Main issue is that the immediate 2nd-level retry, which is necessary to handle IFailed<> in case of FailFast without a Distributed tracking, execute twice the whole pipeline, including message deserialization and saga steps. I'm not sure how dangerous this can be. So this is indeed a convoluted code and may not play nice with some Applications, in particular with UnitOfWork patterns that leverage the Rebus Transaction scope, I fear.
After coding this, I still think would be best if Poison message happen immediately after message processing when is the last try
without waiting for another receive() even when/if Distributed tracking is present. FailFast would also behave nicer as would immediately Poison it (as expected) not "if/when I process on the same instance".
2nd-level retries instead may indeed require DistributeErrorTracking to work reliably over many instances, in particular when FailFast is also used. I'm not confident that my solution in this case is supportable.
The count and sequence of 1st-level and 2nd-level is as fuzzy as of now even with my implementation. Only when used in combination with a Transport DeliveryCount, this effectively guarantee to Poison the message since in a single Receive() the sequence of Handle(T) -> Handle(IFailed<T>) -> Poison
is performed.
Not sure if this guarantee is worth the added complexity which a DistributedErrorTracking would solve nonetheless.
This will be fixed in Rebus 8.
Rebus 8 switches the order of operations in its retry step, making it feasible to track failed delivery attempts in a database/Azure Blob Storage/etc.