silverback icon indicating copy to clipboard operation
silverback copied to clipboard

Concurrency exception in DbDistributedLockManager

Open davimorao opened this issue 2 years ago • 4 comments

Hello, I'm testing with the latest version of SilverBack 3.8.0 and I'm getting the DbUpdateConcurrencyException error from the Microsoft.EntityFrameworkCore package

Stack trace

Silverback.Background.DbDistributedLockManager[27] Failed to send heartbeat for lock OutboxWorker (2f3cf9ce029d45a3ace0e49016cc87d2). Microsoft.EntityFrameworkCore.DbUpdateConcurrencyException: The database operation was expected to affect 1 row(s), but actually affected 0 row(s); data may have been modified or deleted since entities were loaded. See http://go.microsoft.com/fwlink/?LinkId=527962 for information on understanding and handling optimistic concurrency exceptions. at Microsoft.EntityFrameworkCore.Update.AffectedCountModificationCommandBatch.ThrowAggregateUpdateConcurrencyException(Int32 commandIndex, Int32 expectedRowsAffected, Int32 rowsAffected) at Microsoft.EntityFrameworkCore.Update.AffectedCountModificationCommandBatch.ConsumeResultSetWithPropagationAsync(Int32 commandIndex, RelationalDataReader reader, CancellationToken cancellationToken) at Microsoft.EntityFrameworkCore.Update.AffectedCountModificationCommandBatch.ConsumeAsync(RelationalDataReader reader, CancellationToken cancellationToken) at Microsoft.EntityFrameworkCore.Update.ReaderModificationCommandBatch.ExecuteAsync(IRelationalConnection connection, CancellationToken cancellationToken) at Microsoft.EntityFrameworkCore.Update.ReaderModificationCommandBatch.ExecuteAsync(IRelationalConnection connection, CancellationToken cancellationToken) at Microsoft.EntityFrameworkCore.Update.Internal.BatchExecutor.ExecuteAsync(IEnumerable1 commandBatches, IRelationalConnection connection, CancellationToken cancellationToken) at Microsoft.EntityFrameworkCore.Update.Internal.BatchExecutor.ExecuteAsync(IEnumerable1 commandBatches, IRelationalConnection connection, CancellationToken cancellationToken) at Microsoft.EntityFrameworkCore.Update.Internal.BatchExecutor.ExecuteAsync(IEnumerable1 commandBatches, IRelationalConnection connection, CancellationToken cancellationToken) at Microsoft.EntityFrameworkCore.ChangeTracking.Internal.StateManager.SaveChangesAsync(IList1 entriesToSave, CancellationToken cancellationToken) at Microsoft.EntityFrameworkCore.ChangeTracking.Internal.StateManager.SaveChangesAsync(StateManager stateManager, Boolean acceptAllChangesOnSuccess, CancellationToken cancellationToken) at Microsoft.EntityFrameworkCore.SqlServer.Storage.Internal.SqlServerExecutionStrategy.ExecuteAsync[TState,TResult](TState state, Func4 operation, Func4 verifySucceeded, CancellationToken cancellationToken) at Microsoft.EntityFrameworkCore.DbContext.SaveChangesAsync(Boolean acceptAllChangesOnSuccess, CancellationToken cancellationToken) at Microsoft.EntityFrameworkCore.DbContext.SaveChangesAsync(Boolean acceptAllChangesOnSuccess, CancellationToken cancellationToken) at Silverback.Background.DbDistributedLockManager.SendHeartbeatAsync(String resourceName, String uniqueId, IServiceProvider serviceProvider) at Silverback.Background.DbDistributedLockManager.SendHeartbeatAsync(DistributedLockSettings settings)

Package versions:

.Net6 Silverback.Core Version="3.8.0" Silverback.Core.EntityFrameworkCore Version="3.0.1" Silverback.Core.Model Version="3.8.0" Silverback.Integration.Kafka Version="3.8.0" Microsoft.EntityFrameworkCore.SqlServer Version="6.0.10" Microsoft.EntityFrameworkCore.Tools Version="6.0.10"

Here's how the code is:

Startup public static IServiceCollection AddSilverbackServices(this IServiceCollection services) { services.AddSilverback() .UseDbContext<KafkaManagementDbContext>()

                // Setup the lock manager using the database
                // to handle the distributed locks.
                // If this line is omitted the OutboundWorker will still
                // work without locking. 
                .AddDbDistributedLockManager()

                .WithConnectionToMessageBroker(options =>
                    options.AddKafka()
                           .AddOutboxDatabaseTable()
                           .AddOutboxWorker())
                .AddEndpointsConfigurator<InboundEndpointsConfigurator>()
                .AddEndpointsConfigurator<OutboundEndpointsConfigurator>()
                .AddSingletonSubscriber<OrderSubscriber>();

        return services;
    }

InboundEndpointsConfigurator builder .AddKafkaEndpoints(endpoints => endpoints .Configure(config => { config.BootstrapServers = "PLAINTEXT://localhost:9092"; }) .AddInbound(endpoint => endpoint.ConsumeFrom("order-events") .Configure(config => { config.GroupId = "order-consumer"; }) .OnError(policy => policy.Retry(3, TimeSpan.FromSeconds(1)))));

OutboundEndpointsConfigurator builder.AddKafkaEndpoints(endpoints => endpoints .Configure(config => { config.BootstrapServers = "PLAINTEXT://localhost:9092"; }) .AddOutbound<OrderCommand>(endpoint => endpoint.ProduceTo("order-events") .ProduceToOutbox()));

OrderSubscriber `public class OrderSubscriber { private readonly ILogger<OrderSubscriber> _logger; public OrderSubscriber(ILogger<OrderSubscriber> logger) { _logger = logger; }

    public void OnMessageReceived(OrderCommand message)
    {
        _logger.LogInformation($"Received Id: {message.Id}, CreatedAt: {message.CreatedAt:dd/MM/yyyy - HH:mm:ss}");
    }

}`

DbContext `public class KafkaManagementDbContext : DbContext { public KafkaManagementDbContext(DbContextOptions options) : base(options) { this.Database.EnsureCreated(); }

    public DbSet<OutboxMessage> Outbox { get; set; } = null!;
    public DbSet<InboundLogEntry> InboundMessages { get; set; } = null!;
    public DbSet<StoredOffset> StoredOffsets { get; set; } = null!;
    public DbSet<Lock> Locks { get; set; } = null!;

    protected override void OnModelCreating(ModelBuilder modelBuilder)
    {
        modelBuilder.Entity<InboundLogEntry>()
                    .HasKey(t => new { t.MessageId, t.ConsumerGroupName });
    }
}`

OrderCommand public class OrderCommand : IIntegrationCommand { public Guid Id { get; set; } public DateTime CreatedAt { get; set; } }

Publish await _publisher.PublishAsync(new OrderCommand { Id = Guid.NewGuid(), CreatedAt = DateTime.Now }); await _dbContext.SaveChangesAsync();

davimorao avatar Oct 31 '22 21:10 davimorao

This is unfortunately a known issue (but thank you very much for writing the issue, so it stays here as documentation for other users).

It doesn't affect the functionality, since the heartbeat is updated frequently and failing to do that every once in a while will not cause any issue.

To be honest, I'm not very happy with the current design of the entire database integration and the implementation of the distributed lock. I'm reimplementing those parts from scratch in the upcoming major version, to target the specific database engines and take advantage of their individual features. This is the reason why I didn't invest into trying to further improve the current implementation.

In my opinion those errors can just be ignored. Or did you observe any related issue (beside the ugly error in the log)?

BEagle1984 avatar Nov 01 '22 06:11 BEagle1984

@BEagle1984, good ?

I tried to supress this LogEvent, but now it logs as FATAL:

            silverbackBuilder
                .WithLogLevels(configurator => configurator
                    .SetLogLevel(CoreLogEvents.FailedToSendDistributedLockHeartbeat.EventId, LogLevel.None));
[11:18:05 FTL] Failed to send heartbeat for lock cd-ms-sample-api (0d5ff1c6202f4cbfa6f0d45dcd658116).
Microsoft.EntityFrameworkCore.DbUpdateConcurrencyException: The database operation was expected to affect 1 row(s), but actually affected 0 row(s); data may have been modified or deleted since entities were loaded. See http://go.microsoft.com/fwlink/?LinkId=527962 for information on understanding and handling optimistic concurrency exceptions.

Without the SetLogLevel it logs as ERROR.

[11:19:58 ERR] Failed to send heartbeat for lock cd-ms-sample-api (c075cf801ae3456288f8a01337038cc8).
Microsoft.EntityFrameworkCore.DbUpdateConcurrencyException: The database operation was expected to affect 1 row(s), but actually affected 0 row(s); data may have been modified or deleted since entities were loaded. See http://go.microsoft.com/fwlink/?LinkId=527962 for information on understanding and handling optimistic concurrency exceptions.

alefcarlos avatar Feb 03 '23 14:02 alefcarlos

I'm not sure about the behavior of the None level. Have you tried with an actual level like Information or Debug?

BEagle1984 avatar Feb 03 '23 14:02 BEagle1984

Debug/Information is working as expected.

alefcarlos avatar Feb 03 '23 14:02 alefcarlos