Brighter
Brighter copied to clipboard
Outbox Sweeper effectively ignores the distributed lock due to the sweeping being done on a background thread
Describe the bug
In the timed outbox sweeper we use a Locking provider
var lockId = _distributedLock.ObtainLockAsync(LockingResourceName, CancellationToken.None).Result;
if (lockId != null)
{
s_logger.LogInformation("Outbox Sweeper looking for unsent messages");
var scope = _serviceScopeFactory.CreateScope();
try
{
IAmACommandProcessor commandProcessor = scope.ServiceProvider.GetService<IAmACommandProcessor>();
var outBoxSweeper = new OutboxSweeper(
millisecondsSinceSent: _options.MinimumMessageAge,
commandProcessor: commandProcessor,
_options.BatchSize,
_options.UseBulk,
_options.Args);
if (_options.UseBulk)
outBoxSweeper.SweepAsyncOutbox();
else
outBoxSweeper.Sweep();
}
catch (Exception e)
{
s_logger.LogError(e, "Error while sweeping the outbox.");
throw;
}
finally
{
_distributedLock.ReleaseLockAsync(LockingResourceName, lockId, CancellationToken.None).Wait();
scope.Dispose();
}
}
...
however inside of the External bus Service we do the following
if (useAsync)
{
if (!HasAsyncOutbox())
throw new InvalidOperationException("No async outbox defined.");
Task.Run(() => BackgroundDispatchUsingAsync(amountToClear, minimumAge, useBulk, args), CancellationToken.None);
}
else
{
if (!HasOutbox())
throw new InvalidOperationException("No outbox defined.");
Task.Run(() => BackgroundDispatchUsingSync(amountToClear, minimumAge, args));
}
...
meaning that we release the Lock inside of the Timed Sweeper service without any assurances that the Sweep is finished