silverback icon indicating copy to clipboard operation
silverback copied to clipboard

Concurrency exception in DbDistributedLockManager

Open davimorao opened this issue 1 year ago • 4 comments

Hello, I'm testing with the latest version of SilverBack 3.8.0 and I'm getting the DbUpdateConcurrencyException error from the Microsoft.EntityFrameworkCore package

Stack trace

Silverback.Background.DbDistributedLockManager[27] Failed to send heartbeat for lock OutboxWorker (2f3cf9ce029d45a3ace0e49016cc87d2). Microsoft.EntityFrameworkCore.DbUpdateConcurrencyException: The database operation was expected to affect 1 row(s), but actually affected 0 row(s); data may have been modified or deleted since entities were loaded. See http://go.microsoft.com/fwlink/?LinkId=527962 for information on understanding and handling optimistic concurrency exceptions. at Microsoft.EntityFrameworkCore.Update.AffectedCountModificationCommandBatch.ThrowAggregateUpdateConcurrencyException(Int32 commandIndex, Int32 expectedRowsAffected, Int32 rowsAffected) at Microsoft.EntityFrameworkCore.Update.AffectedCountModificationCommandBatch.ConsumeResultSetWithPropagationAsync(Int32 commandIndex, RelationalDataReader reader, CancellationToken cancellationToken) at Microsoft.EntityFrameworkCore.Update.AffectedCountModificationCommandBatch.ConsumeAsync(RelationalDataReader reader, CancellationToken cancellationToken) at Microsoft.EntityFrameworkCore.Update.ReaderModificationCommandBatch.ExecuteAsync(IRelationalConnection connection, CancellationToken cancellationToken) at Microsoft.EntityFrameworkCore.Update.ReaderModificationCommandBatch.ExecuteAsync(IRelationalConnection connection, CancellationToken cancellationToken) at Microsoft.EntityFrameworkCore.Update.Internal.BatchExecutor.ExecuteAsync(IEnumerable1 commandBatches, IRelationalConnection connection, CancellationToken cancellationToken) at Microsoft.EntityFrameworkCore.Update.Internal.BatchExecutor.ExecuteAsync(IEnumerable1 commandBatches, IRelationalConnection connection, CancellationToken cancellationToken) at Microsoft.EntityFrameworkCore.Update.Internal.BatchExecutor.ExecuteAsync(IEnumerable1 commandBatches, IRelationalConnection connection, CancellationToken cancellationToken) at Microsoft.EntityFrameworkCore.ChangeTracking.Internal.StateManager.SaveChangesAsync(IList1 entriesToSave, CancellationToken cancellationToken) at Microsoft.EntityFrameworkCore.ChangeTracking.Internal.StateManager.SaveChangesAsync(StateManager stateManager, Boolean acceptAllChangesOnSuccess, CancellationToken cancellationToken) at Microsoft.EntityFrameworkCore.SqlServer.Storage.Internal.SqlServerExecutionStrategy.ExecuteAsync[TState,TResult](TState state, Func4 operation, Func4 verifySucceeded, CancellationToken cancellationToken) at Microsoft.EntityFrameworkCore.DbContext.SaveChangesAsync(Boolean acceptAllChangesOnSuccess, CancellationToken cancellationToken) at Microsoft.EntityFrameworkCore.DbContext.SaveChangesAsync(Boolean acceptAllChangesOnSuccess, CancellationToken cancellationToken) at Silverback.Background.DbDistributedLockManager.SendHeartbeatAsync(String resourceName, String uniqueId, IServiceProvider serviceProvider) at Silverback.Background.DbDistributedLockManager.SendHeartbeatAsync(DistributedLockSettings settings)

Package versions:

.Net6 Silverback.Core Version="3.8.0" Silverback.Core.EntityFrameworkCore Version="3.0.1" Silverback.Core.Model Version="3.8.0" Silverback.Integration.Kafka Version="3.8.0" Microsoft.EntityFrameworkCore.SqlServer Version="6.0.10" Microsoft.EntityFrameworkCore.Tools Version="6.0.10"

Here's how the code is:

Startup public static IServiceCollection AddSilverbackServices(this IServiceCollection services) { services.AddSilverback() .UseDbContext<KafkaManagementDbContext>()

                // Setup the lock manager using the database
                // to handle the distributed locks.
                // If this line is omitted the OutboundWorker will still
                // work without locking. 
                .AddDbDistributedLockManager()

                .WithConnectionToMessageBroker(options =>
                    options.AddKafka()
                           .AddOutboxDatabaseTable()
                           .AddOutboxWorker())
                .AddEndpointsConfigurator<InboundEndpointsConfigurator>()
                .AddEndpointsConfigurator<OutboundEndpointsConfigurator>()
                .AddSingletonSubscriber<OrderSubscriber>();

        return services;
    }

InboundEndpointsConfigurator builder .AddKafkaEndpoints(endpoints => endpoints .Configure(config => { config.BootstrapServers = "PLAINTEXT://localhost:9092"; }) .AddInbound(endpoint => endpoint.ConsumeFrom("order-events") .Configure(config => { config.GroupId = "order-consumer"; }) .OnError(policy => policy.Retry(3, TimeSpan.FromSeconds(1)))));

OutboundEndpointsConfigurator builder.AddKafkaEndpoints(endpoints => endpoints .Configure(config => { config.BootstrapServers = "PLAINTEXT://localhost:9092"; }) .AddOutbound<OrderCommand>(endpoint => endpoint.ProduceTo("order-events") .ProduceToOutbox()));

OrderSubscriber `public class OrderSubscriber { private readonly ILogger<OrderSubscriber> _logger; public OrderSubscriber(ILogger<OrderSubscriber> logger) { _logger = logger; }

    public void OnMessageReceived(OrderCommand message)
    {
        _logger.LogInformation($"Received Id: {message.Id}, CreatedAt: {message.CreatedAt:dd/MM/yyyy - HH:mm:ss}");
    }

}`

DbContext `public class KafkaManagementDbContext : DbContext { public KafkaManagementDbContext(DbContextOptions options) : base(options) { this.Database.EnsureCreated(); }

    public DbSet<OutboxMessage> Outbox { get; set; } = null!;
    public DbSet<InboundLogEntry> InboundMessages { get; set; } = null!;
    public DbSet<StoredOffset> StoredOffsets { get; set; } = null!;
    public DbSet<Lock> Locks { get; set; } = null!;

    protected override void OnModelCreating(ModelBuilder modelBuilder)
    {
        modelBuilder.Entity<InboundLogEntry>()
                    .HasKey(t => new { t.MessageId, t.ConsumerGroupName });
    }
}`

OrderCommand public class OrderCommand : IIntegrationCommand { public Guid Id { get; set; } public DateTime CreatedAt { get; set; } }

Publish await _publisher.PublishAsync(new OrderCommand { Id = Guid.NewGuid(), CreatedAt = DateTime.Now }); await _dbContext.SaveChangesAsync();

davimorao avatar Oct 31 '22 21:10 davimorao