RepoDB icon indicating copy to clipboard operation
RepoDB copied to clipboard

Bug: Exception on BinaryBulkInsertAsync - The WriteAsync method cannot be called when another write operation is pending.

Open tehmufifnman opened this issue 3 years ago • 16 comments

Bug Description

Getting an exception when trying to use BinaryBulkInsertAsync() System.NotSupportedException: The WriteAsync method cannot be called when another write operation is pending I am trying to inserts 68,000+ objects - could this be the cause?

Exception Message:

      System.ObjectDisposedException: NpgsqlTransaction
       ---> Npgsql.NpgsqlException (0x80004005): Exception while writing to stream
       ---> System.NotSupportedException:  The WriteAsync method cannot be called when another write operation is pending.
         at System.Net.Security.SslStream.WriteAsyncInternal[TIOAdapter](TIOAdapter writeAdapter, ReadOnlyMemory`1 buffer)
         at System.Net.Security.SslStream.Write(Byte[] buffer, Int32 offset, Int32 count)
         at Npgsql.NpgsqlWriteBuffer.Flush(Boolean async, CancellationToken cancellationToken)
         at Npgsql.NpgsqlWriteBuffer.Flush(Boolean async, CancellationToken cancellationToken)
         at Npgsql.NpgsqlBinaryImporter.Cancel(Boolean async, CancellationToken cancellationToken)
         at Npgsql.NpgsqlBinaryImporter.CloseAsync(Boolean async, CancellationToken cancellationToken)
         at Npgsql.NpgsqlBinaryImporter.Close()
         at Npgsql.NpgsqlBinaryImporter.Dispose()
         at RepoDb.NpgsqlConnectionExtension.<>c__DisplayClass36_0`1.<<BinaryImportAsync>g__executeAsync|0>d.MoveNext()
      --- End of stack trace from previous location ---
         at RepoDb.NpgsqlConnectionExtension.TransactionalExecuteAsync[TResult](NpgsqlConnection connection, Func`1 executeAsync, NpgsqlTransaction transaction, CancellationToken cancellationToken)
         --- End of inner exception stack trace ---
         at Npgsql.NpgsqlTransaction.CheckDisposed()
         at Npgsql.NpgsqlTransaction.CheckReady()
         at Npgsql.NpgsqlTransaction.Rollback(Boolean async, CancellationToken cancellationToken)
         at RepoDb.NpgsqlConnectionExtension.TransactionalExecuteAsync[TResult](NpgsqlConnection connection, Func`1 executeAsync, NpgsqlTransaction transaction, CancellationToken cancellationToken)
         at RepoDb.NpgsqlConnectionExtension.TransactionalExecuteAsync[TResult](NpgsqlConnection connection, Func`1 executeAsync, NpgsqlTransaction transaction, CancellationToken cancellationToken)
         at RepoDb.NpgsqlConnectionExtension.BinaryImportAsync[TEntity](NpgsqlConnection connection, String tableName, IEnumerable`1 entities, IEnumerable`1 mappings, IEnumerable`1 dbFields, Nullable`1 bulkCopyTimeout, Nullable`1 batchSize, BulkImportIdentityBehavior identityBehavior, IDbSetting dbSetting, N
pgsqlTransaction transaction, CancellationToken cancellationToken)
         at RepoDb.NpgsqlConnectionExtension.<>c__DisplayClass11_0`1.<<BinaryBulkInsertBaseAsync>b__2>d.MoveNext()

Schema and Model:

Please share to us the schema of the table (not actual) that could help us replicate the issue if necessary.

    internal const string CreateAuctionDataTable = @"
        CREATE UNLOGGED TABLE IF NOT EXISTS auction_data
        (
            auction_data_id uuid PRIMARY KEY default uuid_generate_v1(),
            auction_house_id int NOT NULL REFERENCES auction_house(auction_house_id) ON DELETE CASCADE,
            item int NULL,
            variant_id text NULL,
            is_level_variant boolean DEFAULT FALSE,
            pet_species int NULL,
            bid bigint NULL,
            buyout bigint NULL,
            quantity int NOT NULL
        );
    ";

[Map("auction_data")]
[UsedImplicitly(ImplicitUseTargetFlags.Members)]
public class AuctionData
{
    [Map("auction_data_id")]
    public Guid AuctionDataId { get; set; } = Guid.NewGuid();

    [Map("auction_house_id")]
    public long AuctionHouseId { get; set; }

    [Map("item")]
    public long? Item { get; set; }

    [Map("variant_id")]
    public string VariantId { get; set; }

    [Map("is_level_variant")]
    public bool IsLevelVariant { get; set; }

    [Map("pet_species")]
    public long? PetSpecies { get; set; }

    [Map("bid")]
    public long? Bid { get; set; }

    [Map("buyout")]
    public long? Buyout { get; set; }

    [Map("quantity")]
    public long Quantity { get; set; }
}
await connection.BinaryBulkInsertAsync(newAuctionData, pseudoTableType: BulkImportPseudoTableType.Physical, cancellationToken: cancellationToken);

Library Version:

        <PackageReference Include="RepoDb" Version="1.12.10-beta1" />
        <PackageReference Include="RepoDb.PostgreSql" Version="1.1.5-beta1" />
        <PackageReference Include="RepoDb.PostgreSql.BulkOperations" Version="0.0.7" />

tehmufifnman avatar Nov 28 '21 00:11 tehmufifnman

Thanks for reporting this one. It is important for us to capture this before going beta. Does this exception shows when you simply call the BinaryBulkInsertAsync method passing the 68K rows? Or, did you do anything prior that?

mikependon avatar Nov 28 '21 21:11 mikependon

Correct - it throws the exception after a few seconds trying to insert the 68K rows. If I provide a batch size up to 10K it works fine, but if I push the batch size much higher than 10K the error returns. It seems there is some kind of sweet spot.

I didn't do anything before this outside of the code that produces the data being inserted which doesn't touch RepoDB at all.

tehmufifnman avatar Nov 29 '21 00:11 tehmufifnman

Actually I take that back - a batch size any higher than 100 seems to now be producing the error =(

tehmufifnman avatar Nov 29 '21 01:11 tehmufifnman

Thanks. We will take a look and get back to you. This will be a part of our next release.

mikependon avatar Nov 29 '21 06:11 mikependon

Thank you!

tehmufifnman avatar Nov 29 '21 23:11 tehmufifnman

@mikependon hey friend! did you figure out what was going on with this and get it fixed in a subsequent release?

tehmufifnman avatar Jan 13 '22 03:01 tehmufifnman

It has not =(

Same error as of these package versions

        <PackageReference Include="RepoDb" Version="1.12.10-beta4" />
        <PackageReference Include="RepoDb.PostgreSql" Version="1.1.5-beta2" />
        <PackageReference Include="RepoDb.PostgreSql.BulkOperations" Version="0.0.8" />

tehmufifnman avatar Jan 13 '22 05:01 tehmufifnman

This will be fixed before we do the first beta release of the PostgreSQL Bulk Operations. We are targeting next month (February 2022)

mikependon avatar Jan 13 '22 10:01 mikependon

Just checking in! =) Hows it coming? I am really anxious to try and use this!

tehmufifnman avatar Jan 25 '22 02:01 tehmufifnman

It is unfortunate, we can't use your schema as one table named auction_house is actually missing (it is not shared). But, just FYI, we are now working on this now. Will give you feedback soon.

mikependon avatar Jan 25 '22 22:01 mikependon

We spent time replicating this and we cannot replicate your scenario in relation to the WriteAsync error.

Did you use the same connection when calling the BinaryBulkInsert method? If yes, we suggest to you to open a new connection if you re calling such method. It might be that the Npgsql is not allowing calling the NpgsqlBinaryImporter.Write() method twice while the existing execution still in-progress.

FYI, we did encounter the Connection Busy issue if the BinaryBulkInsertAsync method has been called twice within the same connection. (This is an expected exception)

mikependon avatar Jan 25 '22 22:01 mikependon

Here is a sample small project we played around with this scenario. Would you be able to help us replicate your issue using this small project?

Project: Issue988.zip

mikependon avatar Jan 25 '22 23:01 mikependon

I open a new connection every time I call the method (technically I pull one from pg_bouncer connection pool).

Could it have something to do with the size of the bulk inserts? My bulk inserts are quite large (~100,000 items in some cases), and batch size has no affect.

I also think there might be some confusion. I am not calling WriteAsync(). I am ONLY calling BinaryBulkInsertAsync, and only once at that. The WriteAsync error is an internal error being thrown by the RepoDB library.

tehmufifnman avatar Jan 30 '22 02:01 tehmufifnman

Within the small project we attached, we used 100K rows to bulk insert the data, with 5 threads running together. Basically, as mentioned above, what we encountered is the connection busy exception if the method is being invoked on different threads (Task) within the same connection. We do not encountered the problem if the call is separated by dedicated connection.

Here, we cannot replicate the exact exception you encountered. As we are happy to help you, would you be able to help us replicate the problem using the small project we attached? We hope you can spend an hour to just replicate it - we then are happy to fix it for you.

On the hand, we are not customizing any exception, we throw whatever exception the underlying driver has encountered. The Npgsql Bulk Importer WriteAsyc method is what we used underneath, so caling the BinaryBulkImport is equally calling the WriteAsync method with only the valid mappings from the models and the DB schema.

mikependon avatar Jan 30 '22 07:01 mikependon

@tehmufifnman - we know that it is a bit cumbersome on your part, but we will be releasing in the next few days and we would like to capture this. Would you be able to give us the solution that could replicate your scenario? Otherwise, this will never be a part of that release. Thanks a lot for your help.

mikependon avatar Feb 14 '22 16:02 mikependon

Unless this is being replicated, we will not issue a fix for this one.

mikependon avatar Feb 17 '22 23:02 mikependon