Large BulkInsertOptimizedAsync does not cancel using CancellationToken
When inserting millions of records using BulkInsertOptimizedAsync (event traces of the BPIC17) I noticed that the operation does not cancel even though properly using and passing a CancellationToken. Here a minimal reproducible example:
Docker DB:
version: "3.8"
services:
db:
image: postgres:16-alpine
container_name: bulk-insert-cancel-test-db
restart: unless-stopped
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: db
ports:
- "45432:5432"
Code:
using System;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
public class TableItem {
public Guid Id { get; set; }
public string Name { get; set; } = default!;
}
public class TestContext : DbContext {
public DbSet<TableItem> Table { get; set; }
protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder) {
if (!optionsBuilder.IsConfigured) {
var connectionString = "Host=localhost;Port=45432;Database=db;Username=postgres;Password=postgres";
optionsBuilder.UseNpgsql(connectionString);
}
}
protected override void OnModelCreating(ModelBuilder modelBuilder) {
modelBuilder.Entity<TableItem>(entity => {
entity.HasKey(e => e.Id);
entity.Property(e => e.Id).ValueGeneratedNever();
entity.Property(e => e.Name)
.IsRequired()
.HasMaxLength(256);
});
}
}
public static class ProgramBulkInsert {
public static async Task Main(string[] args) {
const int totalItems = 5_000_000;
Console.WriteLine("Ensuring database is created...");
await using (var setupContext = new TestContext()) {
await setupContext.Database.EnsureCreatedAsync();
await setupContext.Database.ExecuteSqlRawAsync($"TRUNCATE TABLE \"{nameof(TestContext.Table)}\";");
}
Console.WriteLine($"Generating {totalItems:N0} entities in memory...");
var items = Enumerable.Range(1, totalItems)
.Select(i => new TableItem {
Id = Guid.CreateVersion7(),
Name = $"Item #{i}"
})
.ToList();
Console.WriteLine("Entities generated.");
using var cts = new CancellationTokenSource();
var token = cts.Token;
var stopwatch = new Stopwatch();
_ = Task.Run(async () => {
await Task.Delay(TimeSpan.FromSeconds(2));
Console.WriteLine(">>> Requesting cancellation");
cts.Cancel();
stopwatch.Start();
});
Console.WriteLine("Starting BulkInsertOptimizedAsync with cancellation token...");
try {
await using var context = new TestContext();
_ = await context.BulkInsertOptimizedAsync(items, token);
stopwatch.Stop();
Console.WriteLine("BulkInsertOptimizedAsync COMPLETED without throwing.");
await using var verifyContext = new TestContext();
var countInDb = await verifyContext.Table.LongCountAsync();
Console.WriteLine($"Rows in DB after operation: {countInDb:N0}");
}
catch (OperationCanceledException) {
stopwatch.Stop();
Console.WriteLine("BulkInsertOptimizedAsync threw OperationCanceledException (was cancelled).");
Console.WriteLine($"Elapsed since Cancel: {stopwatch.Elapsed}");
}
catch (Exception) {
stopwatch.Stop();
Console.WriteLine("BulkInsertOptimizedAsync threw some other exception:");
}
Console.WriteLine($"CancellationToken.IsCancellationRequested = {token.IsCancellationRequested}");
}
}
Packages:
<ItemGroup>
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="10.0.0" />
<PackageReference Include="Npgsql.EntityFrameworkCore.PostgreSQL" Version="10.0.0" />
<PackageReference Include="Z.EntityFramework.Extensions.EFCore" Version="10.105.0" />
</ItemGroup>
Hello @DerMagereStudent ,
Indeed, it looks like we missed checking the cancellation token when using a BinaryImport strategy.
We are currently working on the fix.
Best Regards,
Jon
Hello @DerMagereStudent ,
A new version is now available.
We improved the code to better handle the CancellationToken when a bulk copy strategy is used. The code now checks the cancellation token every 1000 entities that have been read.
Let me know if everything work now as expected.
Best Regards,
Jon
Hello @JonathanMagnan ,
Thank you for the fix. My use-case now properly cancels due to many small records being inserted like in the example code.
So for my purpose this ticket/issue is done.
But knowing the token is checked only every 1000th iteration lets me define a quick example of how the cancellation will still not be performed properly. The insertion of a few large BLOBS. I asked an LLM who must not be named to adept my previous code and it was proven true. Code below.
I'm wondering if it actually makes sense to check the token only every 1000th iteration. The check is no heavy, thread-synced/locked work. Its just checking a volatile flag
https://github.com/dotnet/runtime/blob/0fb17f7320fbaaa93e91c4c08b3a03175bb5ca15/src/libraries/System.Private.CoreLib/src/System/Threading/CancellationTokenSource.cs#L73
Instead having an iteration counter and doing something like:
if (counter % 1000 == 0)
token.ThrowIfCancellationRequested();
Checking every iteration should be very fast too and either way this check will probably vanish in the I/O noise.
Obviously I'm not an expert and may be missing something, but thats my opinion.
Best Regards,
Marcel
25 records of each 50MB:
using System;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
public class TableItem {
public Guid Id { get; set; }
public string Name { get; set; } = default!;
// Very large blob
public byte[] Data { get; set; } = default!;
}
public class TestContext : DbContext {
public DbSet<TableItem> Table { get; set; }
protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder) {
if (!optionsBuilder.IsConfigured) {
var connectionString = "Host=localhost;Port=45432;Database=db;Username=postgres;Password=postgres";
optionsBuilder.UseNpgsql(connectionString);
}
}
protected override void OnModelCreating(ModelBuilder modelBuilder) {
modelBuilder.Entity<TableItem>(entity => {
entity.HasKey(e => e.Id);
entity.Property(e => e.Id).ValueGeneratedNever();
entity.Property(e => e.Name)
.IsRequired()
.HasMaxLength(256);
// Map the large blob to PostgreSQL bytea
entity.Property(e => e.Data)
.IsRequired()
.HasColumnType("bytea");
});
}
}
public static class ProgramBulkInsert {
public static async Task Main(string[] args) {
const int totalItems = 25;
const int blobSizeBytes = 50 * 1024 * 1024; // 50 MB
Console.WriteLine("Ensuring database is created...");
await using (var setupContext = new TestContext()) {
await setupContext.Database.EnsureCreatedAsync();
// Truncate target table
await setupContext.Database.ExecuteSqlRawAsync(
$"TRUNCATE TABLE \"{nameof(TestContext.Table)}\";");
}
Console.WriteLine($"Generating {totalItems:N0} entities in memory with {blobSizeBytes / (1024 * 1024)} MB BLOBs...");
var rnd = Random.Shared;
var items = Enumerable.Range(1, totalItems)
.Select(i => {
var data = new byte[blobSizeBytes];
rnd.NextBytes(data); // Fill with random data to avoid easy compression / optimization
return new TableItem {
Id = Guid.CreateVersion7(),
Name = $"Item #{i}",
Data = data
};
})
.ToList();
Console.WriteLine("Entities generated.");
// Create a CancellationTokenSource and cancel after 2 seconds
using var cts = new CancellationTokenSource();
var token = cts.Token;
var stopwatch = new Stopwatch();
_ = Task.Run(async () => {
await Task.Delay(TimeSpan.FromSeconds(2));
Console.WriteLine(">>> Requesting cancellation");
cts.Cancel();
stopwatch.Start();
});
Console.WriteLine("Starting BulkInsertOptimizedAsync with cancellation token...");
try {
await using var context = new TestContext();
// This is the call we want to prove does NOT honor the CancellationToken
_ = await context.BulkInsertOptimizedAsync(items, token);
stopwatch.Stop();
Console.WriteLine("BulkInsertOptimizedAsync COMPLETED without throwing.");
await using var verifyContext = new TestContext();
var countInDb = await verifyContext.Table.LongCountAsync();
Console.WriteLine($"Rows in DB after operation: {countInDb:N0}");
}
catch (OperationCanceledException) {
stopwatch.Stop();
Console.WriteLine("BulkInsertOptimizedAsync threw OperationCanceledException (was cancelled).");
Console.WriteLine($"Elapsed since Cancel: {stopwatch.Elapsed}");
}
catch (Exception ex) {
stopwatch.Stop();
Console.WriteLine("BulkInsertOptimizedAsync threw some other exception:");
Console.WriteLine(ex);
}
Console.WriteLine($"CancellationToken.IsCancellationRequested = {token.IsCancellationRequested}");
}
}
This was indeed a big question on our side as well.
I will discuss it again with my developer.
Meanwhile, this is already a major improvement compared to before.
Best Regards,
Jon