EntityFramework-Extensions icon indicating copy to clipboard operation
EntityFramework-Extensions copied to clipboard

Large BulkInsertOptimizedAsync does not cancel using CancellationToken

Open DerMagereStudent opened this issue 1 month ago • 4 comments

When inserting millions of records using BulkInsertOptimizedAsync (event traces of the BPIC17) I noticed that the operation does not cancel even though properly using and passing a CancellationToken. Here a minimal reproducible example:

Docker DB:

version: "3.8"

services:
  db:
    image: postgres:16-alpine
    container_name: bulk-insert-cancel-test-db
    restart: unless-stopped
    environment:
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: postgres
      POSTGRES_DB: db
    ports:
      - "45432:5432"

Code:

using System;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

using Microsoft.EntityFrameworkCore;

public class TableItem {
    public Guid Id { get; set; }
    public string Name { get; set; } = default!;
}

public class TestContext : DbContext {
    public DbSet<TableItem> Table { get; set; }

    protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)    {
        if (!optionsBuilder.IsConfigured) {
            var connectionString = "Host=localhost;Port=45432;Database=db;Username=postgres;Password=postgres";

            optionsBuilder.UseNpgsql(connectionString);
        }
    }

    protected override void OnModelCreating(ModelBuilder modelBuilder) {
        modelBuilder.Entity<TableItem>(entity => {
            entity.HasKey(e => e.Id);
            entity.Property(e => e.Id).ValueGeneratedNever();

            entity.Property(e => e.Name)
                .IsRequired()
                .HasMaxLength(256);
        });
    }
}

public static class ProgramBulkInsert {
    public static async Task Main(string[] args) {
        const int totalItems = 5_000_000;

        Console.WriteLine("Ensuring database is created...");
        await using (var setupContext = new TestContext()) {
            await setupContext.Database.EnsureCreatedAsync();
            await setupContext.Database.ExecuteSqlRawAsync($"TRUNCATE TABLE \"{nameof(TestContext.Table)}\";");
        }

        Console.WriteLine($"Generating {totalItems:N0} entities in memory...");
        var items = Enumerable.Range(1, totalItems)
            .Select(i => new TableItem {
                Id = Guid.CreateVersion7(),
                Name = $"Item #{i}"
            })
            .ToList();

        Console.WriteLine("Entities generated.");

        using var cts = new CancellationTokenSource();
        var token = cts.Token;

        var stopwatch = new Stopwatch();
        _ = Task.Run(async () => {
            await Task.Delay(TimeSpan.FromSeconds(2));
            Console.WriteLine(">>> Requesting cancellation");
            cts.Cancel();
            stopwatch.Start();
        });

        Console.WriteLine("Starting BulkInsertOptimizedAsync with cancellation token...");

        try {
            await using var context = new TestContext();

            _ = await context.BulkInsertOptimizedAsync(items, token);

            stopwatch.Stop();

            Console.WriteLine("BulkInsertOptimizedAsync COMPLETED without throwing.");

            await using var verifyContext = new TestContext();
            var countInDb = await verifyContext.Table.LongCountAsync();
            Console.WriteLine($"Rows in DB after operation: {countInDb:N0}");
        }
        catch (OperationCanceledException) {
            stopwatch.Stop();
            Console.WriteLine("BulkInsertOptimizedAsync threw OperationCanceledException (was cancelled).");
            Console.WriteLine($"Elapsed since Cancel: {stopwatch.Elapsed}");
        }
        catch (Exception) {
            stopwatch.Stop();
            Console.WriteLine("BulkInsertOptimizedAsync threw some other exception:");
        }

        Console.WriteLine($"CancellationToken.IsCancellationRequested = {token.IsCancellationRequested}");
    }
}

Packages:

<ItemGroup>
  <PackageReference Include="Microsoft.EntityFrameworkCore" Version="10.0.0" />
  <PackageReference Include="Npgsql.EntityFrameworkCore.PostgreSQL" Version="10.0.0" />
  <PackageReference Include="Z.EntityFramework.Extensions.EFCore" Version="10.105.0" />
</ItemGroup>

DerMagereStudent avatar Nov 28 '25 20:11 DerMagereStudent

Hello @DerMagereStudent ,

Indeed, it looks like we missed checking the cancellation token when using a BinaryImport strategy.

We are currently working on the fix.

Best Regards,

Jon

JonathanMagnan avatar Dec 01 '25 13:12 JonathanMagnan

Hello @DerMagereStudent ,

A new version is now available.

We improved the code to better handle the CancellationToken when a bulk copy strategy is used. The code now checks the cancellation token every 1000 entities that have been read.

Let me know if everything work now as expected.

Best Regards,

Jon

JonathanMagnan avatar Dec 09 '25 23:12 JonathanMagnan

Hello @JonathanMagnan ,

Thank you for the fix. My use-case now properly cancels due to many small records being inserted like in the example code.

So for my purpose this ticket/issue is done.

But knowing the token is checked only every 1000th iteration lets me define a quick example of how the cancellation will still not be performed properly. The insertion of a few large BLOBS. I asked an LLM who must not be named to adept my previous code and it was proven true. Code below.

I'm wondering if it actually makes sense to check the token only every 1000th iteration. The check is no heavy, thread-synced/locked work. Its just checking a volatile flag

https://github.com/dotnet/runtime/blob/0fb17f7320fbaaa93e91c4c08b3a03175bb5ca15/src/libraries/System.Private.CoreLib/src/System/Threading/CancellationTokenSource.cs#L73

Instead having an iteration counter and doing something like:

if (counter % 1000 == 0)
    token.ThrowIfCancellationRequested();

Checking every iteration should be very fast too and either way this check will probably vanish in the I/O noise.

Obviously I'm not an expert and may be missing something, but thats my opinion.

Best Regards,

Marcel

25 records of each 50MB:

using System;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

using Microsoft.EntityFrameworkCore;

public class TableItem {
    public Guid Id { get; set; }
    public string Name { get; set; } = default!;

    // Very large blob
    public byte[] Data { get; set; } = default!;
}

public class TestContext : DbContext {
    public DbSet<TableItem> Table { get; set; }

    protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder) {
        if (!optionsBuilder.IsConfigured) {
            var connectionString = "Host=localhost;Port=45432;Database=db;Username=postgres;Password=postgres";

            optionsBuilder.UseNpgsql(connectionString);
        }
    }

    protected override void OnModelCreating(ModelBuilder modelBuilder) {
        modelBuilder.Entity<TableItem>(entity => {
            entity.HasKey(e => e.Id);
            entity.Property(e => e.Id).ValueGeneratedNever();

            entity.Property(e => e.Name)
                .IsRequired()
                .HasMaxLength(256);

            // Map the large blob to PostgreSQL bytea
            entity.Property(e => e.Data)
                .IsRequired()
                .HasColumnType("bytea");
        });
    }
}

public static class ProgramBulkInsert {
    public static async Task Main(string[] args) {
        const int totalItems = 25;
        const int blobSizeBytes = 50 * 1024 * 1024; // 50 MB

        Console.WriteLine("Ensuring database is created...");
        await using (var setupContext = new TestContext()) {
            await setupContext.Database.EnsureCreatedAsync();

            // Truncate target table
            await setupContext.Database.ExecuteSqlRawAsync(
                $"TRUNCATE TABLE \"{nameof(TestContext.Table)}\";");
        }

        Console.WriteLine($"Generating {totalItems:N0} entities in memory with {blobSizeBytes / (1024 * 1024)} MB BLOBs...");

        var rnd = Random.Shared;
        var items = Enumerable.Range(1, totalItems)
            .Select(i => {
                var data = new byte[blobSizeBytes];
                rnd.NextBytes(data); // Fill with random data to avoid easy compression / optimization

                return new TableItem {
                    Id = Guid.CreateVersion7(),
                    Name = $"Item #{i}",
                    Data = data
                };
            })
            .ToList();

        Console.WriteLine("Entities generated.");

        // Create a CancellationTokenSource and cancel after 2 seconds
        using var cts = new CancellationTokenSource();
        var token = cts.Token;

        var stopwatch = new Stopwatch();
        _ = Task.Run(async () => {
            await Task.Delay(TimeSpan.FromSeconds(2));
            Console.WriteLine(">>> Requesting cancellation");
            cts.Cancel();
            stopwatch.Start();
        });

        Console.WriteLine("Starting BulkInsertOptimizedAsync with cancellation token...");

        try {
            await using var context = new TestContext();

            // This is the call we want to prove does NOT honor the CancellationToken
            _ = await context.BulkInsertOptimizedAsync(items, token);

            stopwatch.Stop();

            Console.WriteLine("BulkInsertOptimizedAsync COMPLETED without throwing.");

            await using var verifyContext = new TestContext();
            var countInDb = await verifyContext.Table.LongCountAsync();
            Console.WriteLine($"Rows in DB after operation: {countInDb:N0}");
        }
        catch (OperationCanceledException) {
            stopwatch.Stop();
            Console.WriteLine("BulkInsertOptimizedAsync threw OperationCanceledException (was cancelled).");
            Console.WriteLine($"Elapsed since Cancel: {stopwatch.Elapsed}");
        }
        catch (Exception ex) {
            stopwatch.Stop();
            Console.WriteLine("BulkInsertOptimizedAsync threw some other exception:");
            Console.WriteLine(ex);
        }

        Console.WriteLine($"CancellationToken.IsCancellationRequested = {token.IsCancellationRequested}");
    }
}

DerMagereStudent avatar Dec 10 '25 00:12 DerMagereStudent

This was indeed a big question on our side as well.

I will discuss it again with my developer.

Meanwhile, this is already a major improvement compared to before.

Best Regards,

Jon

JonathanMagnan avatar Dec 10 '25 01:12 JonathanMagnan