silverback
silverback copied to clipboard
Concurrency exception in DbDistributedLockManager
Hello, I'm testing with the latest version of SilverBack 3.8.0 and I'm getting the DbUpdateConcurrencyException error from the Microsoft.EntityFrameworkCore package
Stack trace
Silverback.Background.DbDistributedLockManager[27]
Failed to send heartbeat for lock OutboxWorker (2f3cf9ce029d45a3ace0e49016cc87d2).
Microsoft.EntityFrameworkCore.DbUpdateConcurrencyException: The database operation was expected to affect 1 row(s), but actually affected 0 row(s); data may have been modified or deleted since entities were loaded. See http://go.microsoft.com/fwlink/?LinkId=527962 for information on understanding and handling optimistic concurrency exceptions.
at Microsoft.EntityFrameworkCore.Update.AffectedCountModificationCommandBatch.ThrowAggregateUpdateConcurrencyException(Int32 commandIndex, Int32 expectedRowsAffected, Int32 rowsAffected)
at Microsoft.EntityFrameworkCore.Update.AffectedCountModificationCommandBatch.ConsumeResultSetWithPropagationAsync(Int32 commandIndex, RelationalDataReader reader, CancellationToken cancellationToken)
at Microsoft.EntityFrameworkCore.Update.AffectedCountModificationCommandBatch.ConsumeAsync(RelationalDataReader reader, CancellationToken cancellationToken)
at Microsoft.EntityFrameworkCore.Update.ReaderModificationCommandBatch.ExecuteAsync(IRelationalConnection connection, CancellationToken cancellationToken)
at Microsoft.EntityFrameworkCore.Update.ReaderModificationCommandBatch.ExecuteAsync(IRelationalConnection connection, CancellationToken cancellationToken)
at Microsoft.EntityFrameworkCore.Update.Internal.BatchExecutor.ExecuteAsync(IEnumerable1 commandBatches, IRelationalConnection connection, CancellationToken cancellationToken) at Microsoft.EntityFrameworkCore.Update.Internal.BatchExecutor.ExecuteAsync(IEnumerable
1 commandBatches, IRelationalConnection connection, CancellationToken cancellationToken)
at Microsoft.EntityFrameworkCore.Update.Internal.BatchExecutor.ExecuteAsync(IEnumerable1 commandBatches, IRelationalConnection connection, CancellationToken cancellationToken) at Microsoft.EntityFrameworkCore.ChangeTracking.Internal.StateManager.SaveChangesAsync(IList
1 entriesToSave, CancellationToken cancellationToken)
at Microsoft.EntityFrameworkCore.ChangeTracking.Internal.StateManager.SaveChangesAsync(StateManager stateManager, Boolean acceptAllChangesOnSuccess, CancellationToken cancellationToken)
at Microsoft.EntityFrameworkCore.SqlServer.Storage.Internal.SqlServerExecutionStrategy.ExecuteAsync[TState,TResult](TState state, Func4 operation, Func
4 verifySucceeded, CancellationToken cancellationToken)
at Microsoft.EntityFrameworkCore.DbContext.SaveChangesAsync(Boolean acceptAllChangesOnSuccess, CancellationToken cancellationToken)
at Microsoft.EntityFrameworkCore.DbContext.SaveChangesAsync(Boolean acceptAllChangesOnSuccess, CancellationToken cancellationToken)
at Silverback.Background.DbDistributedLockManager.SendHeartbeatAsync(String resourceName, String uniqueId, IServiceProvider serviceProvider)
at Silverback.Background.DbDistributedLockManager.SendHeartbeatAsync(DistributedLockSettings settings)
Package versions:
.Net6 Silverback.Core Version="3.8.0" Silverback.Core.EntityFrameworkCore Version="3.0.1" Silverback.Core.Model Version="3.8.0" Silverback.Integration.Kafka Version="3.8.0" Microsoft.EntityFrameworkCore.SqlServer Version="6.0.10" Microsoft.EntityFrameworkCore.Tools Version="6.0.10"
Here's how the code is:
Startup public static IServiceCollection AddSilverbackServices(this IServiceCollection services) { services.AddSilverback() .UseDbContext<KafkaManagementDbContext>()
// Setup the lock manager using the database
// to handle the distributed locks.
// If this line is omitted the OutboundWorker will still
// work without locking.
.AddDbDistributedLockManager()
.WithConnectionToMessageBroker(options =>
options.AddKafka()
.AddOutboxDatabaseTable()
.AddOutboxWorker())
.AddEndpointsConfigurator<InboundEndpointsConfigurator>()
.AddEndpointsConfigurator<OutboundEndpointsConfigurator>()
.AddSingletonSubscriber<OrderSubscriber>();
return services;
}
InboundEndpointsConfigurator
builder .AddKafkaEndpoints(endpoints => endpoints .Configure(config => { config.BootstrapServers = "PLAINTEXT://localhost:9092"; }) .AddInbound(endpoint => endpoint.ConsumeFrom("order-events") .Configure(config => { config.GroupId = "order-consumer"; }) .OnError(policy => policy.Retry(3, TimeSpan.FromSeconds(1)))));
OutboundEndpointsConfigurator
builder.AddKafkaEndpoints(endpoints => endpoints .Configure(config => { config.BootstrapServers = "PLAINTEXT://localhost:9092"; }) .AddOutbound<OrderCommand>(endpoint => endpoint.ProduceTo("order-events") .ProduceToOutbox()));
OrderSubscriber `public class OrderSubscriber { private readonly ILogger<OrderSubscriber> _logger; public OrderSubscriber(ILogger<OrderSubscriber> logger) { _logger = logger; }
public void OnMessageReceived(OrderCommand message)
{
_logger.LogInformation($"Received Id: {message.Id}, CreatedAt: {message.CreatedAt:dd/MM/yyyy - HH:mm:ss}");
}
}`
DbContext `public class KafkaManagementDbContext : DbContext { public KafkaManagementDbContext(DbContextOptions options) : base(options) { this.Database.EnsureCreated(); }
public DbSet<OutboxMessage> Outbox { get; set; } = null!;
public DbSet<InboundLogEntry> InboundMessages { get; set; } = null!;
public DbSet<StoredOffset> StoredOffsets { get; set; } = null!;
public DbSet<Lock> Locks { get; set; } = null!;
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
modelBuilder.Entity<InboundLogEntry>()
.HasKey(t => new { t.MessageId, t.ConsumerGroupName });
}
}`
OrderCommand
public class OrderCommand : IIntegrationCommand { public Guid Id { get; set; } public DateTime CreatedAt { get; set; } }
Publish
await _publisher.PublishAsync(new OrderCommand { Id = Guid.NewGuid(), CreatedAt = DateTime.Now }); await _dbContext.SaveChangesAsync();