RepoDB icon indicating copy to clipboard operation
RepoDB copied to clipboard

Bug: BinaryImport - various exceptions when importing timestamp without time zone

Open DanielTuran opened this issue 7 months ago • 2 comments

Bug Description

When doing BinaryImport I'm occasionally getting an unhelpful exception. As discussed here: npgsql/issues/6128, the real exception is hidden by dispose exception

try {
	await connection.BinaryImportAsync(
		"RecordMonitoring.TraderRecord",
		traderRecords, //max 10k records
		s_traderRecordMapping,
		cancellationToken: cancellationToken);
}
catch (Exception ex) {
	logger.LogError(ex, "Error while saving {EntityName}", nameof(TraderRecord));
	throw;
}

Exception Message:

System.ObjectDisposedException: NpgsqlTransaction
       ---> Npgsql.NpgsqlException (0x80004005): Exception while writing to stream
       ---> System.NotSupportedException:  This method may not be called when another write operation is pending.
         at System.Net.Security.SslStream.WriteAsyncInternal[TIOAdapter](ReadOnlyMemory`1 buffer, CancellationToken cancellationToken)
         at System.Net.Security.SslStream.Write(Byte[] buffer, Int32 offset, Int32 count)
         at Npgsql.Internal.NpgsqlWriteBuffer.Flush(Boolean async, CancellationToken cancellationToken)
         at Npgsql.Internal.NpgsqlWriteBuffer.Flush(Boolean async, CancellationToken cancellationToken)
         at Npgsql.NpgsqlBinaryImporter.Cancel(Boolean async, CancellationToken cancellationToken)
         at Npgsql.NpgsqlBinaryImporter.CloseAsync(Boolean async, CancellationToken cancellationToken)
         at Npgsql.NpgsqlBinaryImporter.Close()
         at Npgsql.NpgsqlBinaryImporter.Dispose()
         at RepoDb.NpgsqlConnectionExtension.<>c__DisplayClass36_0`1.<<BinaryImportAsync>g__executeAsync|0>d.MoveNext()
      --- End of stack trace from previous location ---
         at RepoDb.NpgsqlConnectionExtension.TransactionalExecuteAsync[TResult](NpgsqlConnection connection, Func`1 executeAsync, NpgsqlTransaction transaction, CancellationToken cancellationToken)
         --- End of inner exception stack trace ---
         at Npgsql.ThrowHelper.ThrowObjectDisposedException(String objectName, Exception innerException)
         at Npgsql.NpgsqlTransaction.CheckDisposed()
         at Npgsql.NpgsqlTransaction.CheckReady()
         at Npgsql.NpgsqlTransaction.Rollback(Boolean async, CancellationToken cancellationToken)
         at RepoDb.NpgsqlConnectionExtension.TransactionalExecuteAsync[TResult](NpgsqlConnection connection, Func`1 executeAsync, NpgsqlTransaction transaction, CancellationToken cancellationToken)
         at RepoDb.NpgsqlConnectionExtension.TransactionalExecuteAsync[TResult](NpgsqlConnection connection, Func`1 executeAsync, NpgsqlTransaction transaction, CancellationToken cancellationToken)
         at RepoDb.NpgsqlConnectionExtension.BinaryImportAsync[TEntity](NpgsqlConnection connection, String tableName, IEnumerable`1 entities, IEnumerable`1 mappings, DbFieldCollection dbFields, Nullable`1 bulkCopyTimeout, Nullable`1 batchSize, BulkImportIdentityBehavior identityBehavior, IDbSetting dbSetting, NpgsqlTransaction transaction, CancellationToken cancellationToken)
         at RepoDb.NpgsqlConnectionExtension.BinaryImportAsync[TEntity](NpgsqlConnection connection, String tableName, IEnumerable`1 entities, IEnumerable`1 mappings, Nullable`1 bulkCopyTimeout, Nullable`1 batchSize, Boolean keepIdentity, NpgsqlTransaction transaction, CancellationToken cancellationToken)
         at MyCompany.MyProject.Monitoring.RecordSampler.Database.PgSqlRecordStore.Save(IReadOnlyList`1 traderRecords, CancellationToken cancellationToken) in /build/src/Monitoring/Record/RecordSampler/Database/PgSqlRecordStore.cs:line 27

Schema and Model:

CREATE TABLE IF NOT EXISTS "RecordMonitoring"."TraderRecord"
(
    "Id" bigint NOT NULL DEFAULT nextval('"RecordMonitoring"."TraderRecord_Id_seq"'::regclass),
    "TraderId" bigint NOT NULL,
    "Col" character varying(64) COLLATE pg_catalog."default" NOT NULL,
    "RecordUsd" numeric(26,6) NOT NULL,
    "Timestamp" timestamp without time zone NOT NULL DEFAULT (now() AT TIME ZONE 'UTC'::text),
    CONSTRAINT "PK_TraderRecord" PRIMARY KEY ("Id")
)


CREATE INDEX IF NOT EXISTS "IX_RecordMonitoring_TraderRecord_TraderId_Timestamp_Col"
    ON "RecordMonitoring"."TraderRecord" USING btree
    ("TraderId" ASC NULLS LAST, "Timestamp" ASC NULLS LAST, "Col" COLLATE pg_catalog."default" ASC NULLS LAST)
    TABLESPACE pg_default;
public sealed class TraderRecord
{
	public long Id { get; set; }

	public long TraderId { get; set; }

	public string Col { get; set; } = string.Empty;

	public decimal RecordUsd { get; set; }

	public DateTimeOffset Timestamp { get; set; }
}
	private static readonly IReadOnlyCollection<NpgsqlBulkInsertMapItem> s_traderRecordMapping = [
		new(nameof(TraderRecord.TraderId), nameof(TraderRecord.TraderId)),
		new(nameof(TraderRecord.Col), nameof(TraderRecord.Col)),
		new(nameof(TraderRecord.RecordUsd), nameof(TraderRecord.RecordUsd)),
		new(nameof(TraderRecord.Timestamp), nameof(TraderRecord.Timestamp)),
	];

Library Version:

RepoDb v1.13.1 RepoDb.PostgreSql v1.13.1 RepoDb.PostgreSql.BulkOperations v1.13.1 Npgsql: 9.0.3 PostgreSQL version: 14.17 (Azure Database for PostgreSQL flexible server)

EDIT. I'm randomly getting different exceptions when running the same integration tests, see comment below.

DanielTuran avatar Jun 05 '25 07:06 DanielTuran

When debugging it locally, I get this exception

Message: 
System.InvalidOperationException : Write for column 2 resolves to a different PostgreSQL type: OID 1184 than the first row resolved to (OID 1700). Please make sure to use clr types that resolve to the same PostgreSQL type across rows. Alternatively pass the same NpgsqlDbType or DataTypeName to ensure the PostgreSQL type ends up to be identical.

  Stack Trace: 
NpgsqlBinaryImporter.<Write>g__Core|26_0[T](Boolean async, T value, Nullable`1 npgsqlDbType, String dataTypeName, CancellationToken cancellationToken)
<<BinaryImportAsync>b__2>d.MoveNext()
--- End of stack trace from previous location ---
NpgsqlConnectionExtension.BinaryImportWriteAsync[TEntity](NpgsqlBinaryImporter importer, Func`1 moveNextAsync, Func`1 getCurrentAsync, Func`2 writeAsync, BulkImportIdentityBehavior identityBehavior, CancellationToken cancellationToken)
NpgsqlConnectionExtension.BinaryImportAsync[TEntity](NpgsqlBinaryImporter importer, String tableName, IEnumerable`1 entities, IEnumerable`1 mappings, Type entityType, BulkImportIdentityBehavior identityBehavior, CancellationToken cancellationToken)
<<BinaryImportAsync>g__executeAsync|0>d.MoveNext()
--- End of stack trace from previous location ---
NpgsqlConnectionExtension.TransactionalExecuteAsync[TResult](NpgsqlConnection connection, Func`1 executeAsync, NpgsqlTransaction transaction, CancellationToken cancellationToken)
NpgsqlConnectionExtension.TransactionalExecuteAsync[TResult](NpgsqlConnection connection, Func`1 executeAsync, NpgsqlTransaction transaction, CancellationToken cancellationToken)
NpgsqlConnectionExtension.TransactionalExecuteAsync[TResult](NpgsqlConnection connection, Func`1 executeAsync, NpgsqlTransaction transaction, CancellationToken cancellationToken)
NpgsqlConnectionExtension.BinaryImportAsync[TEntity](NpgsqlConnection connection, String tableName, IEnumerable`1 entities, IEnumerable`1 mappings, DbFieldCollection dbFields, Nullable`1 bulkCopyTimeout, Nullable`1 batchSize, BulkImportIdentityBehavior identityBehavior, IDbSetting dbSetting, NpgsqlTransaction transaction, CancellationToken cancellationToken)
NpgsqlConnectionExtension.BinaryImportAsync[TEntity](NpgsqlConnection connection, String tableName, IEnumerable`1 entities, IEnumerable`1 mappings, Nullable`1 bulkCopyTimeout, Nullable`1 batchSize, Boolean keepIdentity, NpgsqlTransaction transaction, CancellationToken cancellationToken)
PgSqlRecordStore.Save(IReadOnlyList`1 traderRecords, CancellationToken cancellationToken) line 27
PgSqlRecordStore.Save(IReadOnlyList`1 traderRecords, CancellationToken cancellationToken) line 36
PgSqlRecordStoreTests.PgSqlRecordStore_Can_Save() line 28

but only if I call it in a loop:

	[Fact]
	public async Task PgSqlRecordStore_Can_Save()
	{
		var recordStore = Services.GetRequiredService<PgSqlRecordStore>();


		for (int j = 0; j < 100; j++) {
			var equities = Enumerable.Range(1, 100_000).Select(i => new TraderReci {
				TraderId = i % 100 + 1,
				Col = "Col" + (i % 10),
				RecordUsd = i,
				Timestamp = MockUtcNow
			}).ToList();

			//act
			await recordStore.Save(equities); // fails at n-th interation
		}
	}

EDIT:

When running the test without debugger attached, I get this exception:, also on n-th interation of the for-looop

  System.ObjectDisposedException : NpgsqlTransaction
  ---- System.InvalidCastException : Unable to cast object of type 'System.DateTimeOffset' to type 'System.Decimal'.

Stack Trace: 
  ThrowHelper.ThrowObjectDisposedException(String objectName, Exception innerException)
  NpgsqlTransaction.CheckDisposed()
  NpgsqlTransaction.CheckReady()
  NpgsqlTransaction.Rollback(Boolean async, CancellationToken cancellationToken)
  NpgsqlConnectionExtension.TransactionalExecuteAsync[TResult](NpgsqlConnection connection, Func`1 executeAsync, NpgsqlTransaction transaction, CancellationToken cancellationToken)
  NpgsqlConnectionExtension.TransactionalExecuteAsync[TResult](NpgsqlConnection connection, Func`1 executeAsync, NpgsqlTransaction transaction, CancellationToken cancellationToken)
  NpgsqlConnectionExtension.BinaryImportAsync[TEntity](NpgsqlConnection connection, String tableName, IEnumerable`1 entities, IEnumerable`1 mappings, DbFieldCollection dbFields, Nullable`1 bulkCopyTimeout, Nullable`1 batchSize, BulkImportIdentityBehavior identityBehavior, IDbSetting dbSetting, NpgsqlTransaction transaction, CancellationToken cancellationToken)
  NpgsqlConnectionExtension.BinaryImportAsync[TEntity](NpgsqlConnection connection, String tableName, IEnumerable`1 entities, IEnumerable`1 mappings, Nullable`1 bulkCopyTimeout, Nullable`1 batchSize, Boolean keepIdentity, NpgsqlTransaction transaction, CancellationToken cancellationToken)
  PgSqlrecordStore.Save(IReadOnlyList`1 traderrecords, CancellationToken cancellationToken) line 27
  PgSqlrecordStore.Save(IReadOnlyList`1 traderrecords, CancellationToken cancellationToken) line 36
  PgSqlrecordStoreTests.PgSqlrecordStore_Can_Save() line 28
  PgSqlrecordStoreTests.PgSqlrecordStore_Can_Save() line 19
  --- End of stack trace from previous location ---
  ----- Inner Stack Trace -----
  NpgsqlParameter.Write(Boolean async, PgWriter writer, CancellationToken cancellationToken)
  NpgsqlBinaryImporter.<Write>g__Core|26_0[T](Boolean async, T value, Nullable`1 npgsqlDbType, String dataTypeName, CancellationToken cancellationToken)

When I change BinaryImportAsync to ImportAllAsync, I don't get the exception Unable to cast object of type 'System.DateTimeOffset' to type 'System.Decimal'. I haven't tried in production however

DanielTuran avatar Jun 05 '25 08:06 DanielTuran

And now the same code throws

---- System.InvalidCastException : Unable to cast object of type 'System.Decimal' to type 'System.String'.```

DanielTuran avatar Jun 05 '25 08:06 DanielTuran