Dapper-Plus icon indicating copy to clipboard operation
Dapper-Plus copied to clipboard

IndexOutOfRangeException on BulkMerge operations

Open ButaevSergey opened this issue 2 years ago • 1 comments

Im recieving a frequent (yet not constant) exception when trying to perform a BulkMerge operation (~30k records affected).

This is a stack trace im getting:

System.IndexOutOfRangeException
  HResult=0x80131508
  Message=There is no row at position 70.
  Source=System.Data.Common
  StackTrace:
   at System.Data.RBTree`1.GetNodeByIndex(Int32 userIndex)
   at .( , DbCommand )
   at .Execute(List`1 actions)
   at .(List`1 )
   at Z.BulkOperations.BulkOperation.Execute()
   at Z.BulkOperations.BulkOperation.BulkMerge()
   at Z.Dapper.Plus.DapperPlusAction.Execute()
   at Z.Dapper.Plus.DapperPlusActionSet`1..(Object )
   at System.Collections.Generic.List`1.ForEach(Action`1 action)
   at Z.Dapper.Plus.DapperPlusActionSet`1.BulkMerge[T](String mapperKey, IEnumerable`1 items, Func`2[] selectors)
   at Z.Dapper.Plus.DapperPlusActionSet`1.BulkMerge[T](IEnumerable`1 items, Func`2[] selectors)
   at X.Auto.Export.NpgsqlDataProvider.<>c__DisplayClass7_0.<UpsertModels>b__1(DapperPlusActionSet`1 x) in ...\NpgsqlDataProvider.cs:line 127
   at Z.Dapper.Plus.DapperPlusActionSet`1..()
   at System.Threading.Tasks.Task.InnerInvoke()
   at System.Threading.Tasks.Task.<>c.<.cctor>b__272_0(Object obj)
   at System.Threading.ExecutionContext.RunFromThreadPoolDispatchLoop(Thread threadPoolThread, ExecutionContext executionContext, ContextCallback callback, Object state)

  This exception was originally thrown at this call stack:
    [External Code]
    NpgsqlDataProvider.UpsertModels.AnonymousMethod__1(Z.Dapper.Plus.DapperPlusActionSet<object>) in NpgsqlDataProvider.cs
    [External Code]

Im performing multiple concurrent async read only operations to create an input for this BulkMerge via NpgSqlCommand.

This is the constructor with the global context mapping that gets instantiated extactly once (injecting via singleton).

public NpgsqlBatchDataProvider(IOptionsMonitor<Options> options, ILogger<NpgsqlBatchDataProvider> logger)
{
            _logger = logger;
            string connectionString = options?.CurrentValue?.DefaultConnection ?? throw new ArgumentNullException("connectionString");
            var builder = new NpgsqlConnectionStringBuilder(connectionString);
            builder.Multiplexing = false;
            _dataSrc = NpgsqlDataSource.Create(builder);
           
            DapperPlusManager.Entity<First>().Table("public.First").Key(x => x.Id).AutoMap();
            DapperPlusManager.Entity<Second>().Table("public.Second").Key(x => x.Id).ForeignKey(x => x.FirstId).Ignore(x => x.NameRus).AfterAction((kind, m) =>
            {
                if (m.Thirds != null)
                    foreach (var b in m.Thirds)
                        b.SecondId = m.Id;
            }).AutoMap();
            DapperPlusManager.Entity<Third>().Table("public.Third").Key(x => x.Id).Identity(x => x.Id).ForeignKey(x => x.SecondId).AfterAction((kind, b) =>
            {
                if (b.Fourths != null)
                    foreach (var m in b.Fourths)
                        m.ThirdId = b.Id;
                if (b.Fifths != null)
                    foreach (var p in b.Fifths)
                        p.ThirdId = b.Id;
            }).AutoMap();
            DapperPlusManager.Entity<Fifth>().Table("public.Fifth").Key(x => x.Id).Identity(x => x.Id).ForeignKey(x => x.ThirdId).AutoMap();
            DapperPlusManager.Entity<Fourth>().Table("public.Fourth").Key(x => x.Id).Identity(x => x.Id).ForeignKey(x => x.ThirdId).AfterAction((kind, m) =>
            {
                if (m.Sixths != null)
                    foreach (var opt in m.Sixths)
                        opt.FourthId = m.Id;
            }).AutoMap();
            DapperPlusManager.Entity<Sixth>().Table("public.Sixth").Key(x => new { x.FourthId, x.SixthCode })
                .ForeignKey(x => x.FourthId).ForeignKey(x => x.SixthCode).AutoMap();
}

Method that throws this exception:

public async Task UpsertModels(ICollection<Second> models, CancellationToken ct)
        {
            try
            {
                await using var conn = await _dataSrc.OpenConnectionAsync(ct);
                await using var transaction = await conn.BeginTransactionAsync(IsolationLevel.ReadUncommitted, ct);

                await transaction
                    .BulkActionAsync(x => x.BulkMerge(models).ThenBulkMerge(m => m.Thirds)
                        .AlsoBulkMerge(b => b.Fifths)
                        .ThenBulkMerge(b => b.Fourths).ThenBulkMerge(m => m.Sixths), ct);
                await transaction.CommitAsync(ct);
            }
            catch (Exception e)
            {

            }
        }

Framework: .NET 6 Packages:

  • Z.Dapper.Plus v.5.0.0
  • Npgsql v.7.0.1
  • Dapper v.2.0.123

Tampering with BatchSize helps but does not provide a stable solution in the long run. Right now im processing models sequentially (~1k records for each "Second" entity) which is substantially slower and far less optimal, yet it throws no exceptions.

ButaevSergey avatar Mar 08 '23 09:03 ButaevSergey

Hello @ButaevSergey ,

Thank you for reporting, my developer will look at it.

Best Regards,

Jon

JonathanMagnan avatar Mar 08 '23 16:03 JonathanMagnan