RepoDB icon indicating copy to clipboard operation
RepoDB copied to clipboard

Feature: Introduce the BulkInsert in RepoDb.PostgreSql.

Open mikependon opened this issue 5 years ago • 14 comments

In the initial release of RepoDb.PostgrelSql, the bulk-insert is not yet supported. This feature will allow the users to do the actual bulk insert operations when connecting to PostgreSql DB.

See below.

using (var connection = new NpgsqlConnection(Database.ConnectionString))
{
	connection.BulkInsert<Entity>(entities);
}

The InsertAll is already introduced in the initial release. This operation is a batch-operation (packed statements), not bulk-operation.

The author will add more information on this story soon.

mikependon avatar Jan 18 '20 13:01 mikependon

this would be of great benefit to us. thank you!

githubfanster avatar Jul 17 '20 04:07 githubfanster

I've tried testing this new PostgreSql BinaryBulkInsert where I was previously using InsertAll and I'm getting a strange and not very helpful exception.

The exception is just "Can't change NpgsqlDbType from Date to Integer", no further info. Particularly strange as I'm not using any 'Date' fields; the closest I've got to something like that is a "timestamp with time zone" field mapped to a NodaTime "Instant" in code.

An otherwise identical "InsertAll" works perfectly.

I wasn't sure if I should comment here or open a new issue, sorry If I got it wrong!

csutcliff avatar Oct 23 '21 00:10 csutcliff

Hi Mate, thanks for testing the API. We are happy that somebody (like you) from the community is testing its integrity. Many thanks for that in advance! 😃

The full feature will most likely be completed soon and we will issue a beta. But, we would like to capture your exception as early as now, since we are currently focusing and active on the development of this.

Would you be able to share your model and table schema?

PS: It is important to take note that this is not yet on RC (nor even on beta phase yet), so expect some minor bugs.

mikependon avatar Oct 23 '21 06:10 mikependon

There are some resolvers built on top of Bulk Operations that does resolve the .NET CLR Type into its equivalent NpgsqlDbType before proceeding with the actual operation towards the DB. We suspect that this issue might have been caused by such resolvers.

Though, we strongly recommend to double check the properties of your model whether any of it is pointing/mapped to a Date/Timestamp-based field in your underlying table (vice-versa).

Of course, we can enlighten up further and help you on this once the model and schema is shared here.

mikependon avatar Oct 23 '21 08:10 mikependon

Thanks for the reply Mike, I'm aware that this is very fresh alpha code, Just running some performance testing at the moment as I'm looking to move away from our existing mssql TVP bulk inserts to repoDB + pgsql at some point in the future.

As I mentioned earlier we use NodaTime for timestamps etc internally and it is the recommended way to interact with PostgreSQL date/time types according to npgsql docs; is this supported by RepoDB/BinaryBulkInsert? I have Npgsql.NodaTime package installed and called NpgsqlConnection.GlobalTypeMapper.UseNodaTime(); at the start of the test.

Here is the model and schema I'm using for the tests. The model is a basic wrapper around an existing class just for testing.

using NodaTime;
using NpgsqlTypes;
using RCT_Racing.Model;
using RepoDb.Attributes;
using RepoDb.Attributes.Parameter.Npgsql;
using System.Text.Json;

namespace DatabaseBenchmarks
{
    public class PgBet
    {
        private readonly Bet _source;

        public PgBet(Bet source)
        {
            _source = source;
            LegRunnersJSON = JsonSerializer.Serialize(_source.LegRunners);
        }

        [Map("betoption")]
        public short BetOption => _source.BetOption;

        [Map("bettypeid")]
        public short BetTypeID => _source.BetCode;

        [Map("cancelrequested")]
        public bool CancelRequested => false;

        [Map("cashedamount")]
        public int CashedAmount => 0;

        [Map("commissionpercent")]
        public int CommissionPercent => _source.CommissionPercent;

        [Map("errorcode")]
        public short? ErrorCode => _source.ErrorCode;

        [Map("errortext")]
        public string? ErrorText => null;

        [Map("cancelled")]
        public bool Cancelled => false;

        [Map("cashed")]
        public bool Cashed => false;

        [NpgsqlDbType(NpgsqlDbType.Json)]
        [Map("legrunners")]
        public string LegRunners { get; }

        [Map("meetname")]
        public string? MeetName => _source.Race.Meeting.FullName;

        [Map("meetnumber")]
        public short MeetNumber => Convert.ToInt16(_source.Race.Meeting.Number);

        [Map("numlegs")]
        public short NumLegs => _source.NumLegs;

        [Map("racenumber")]
        public short RaceNumber => Convert.ToInt16(_source.Race.Number);

        [NpgsqlDbType(NpgsqlDbType.TimestampTz)]
        [Map("racetime")]
        public Instant RaceTime => _source.Race.PostTime;

        [NpgsqlDbType(NpgsqlDbType.TimestampTz)]
        [Map("sold")]
        public Instant Sold => _source.Sold;

        [Map("stake")]
        public int Stake => _source.Stake;

        [Map("ticketcost")]
        public int TicketCost => _source.TicketCost;

        [Map("toteid")]
        public int ToteID => _source.BetId;

        [Map("tsn")]
        public string? TSN => _source.TSN;

        [Primary, Map("uniqueid")]
        public long UniqueId => _source.Sold.ToDateTimeUtc().Date.Ticks + _source.BetId;

        [Map("voidamount")]
        public int VoidAmount => 0;
    }
}
CREATE TABLE IF NOT EXISTS public.placedbets
(
    uniqueid bigint NOT NULL,
    toteid integer NOT NULL,
    betoption smallint NOT NULL,
    bettypeid smallint NOT NULL,
    cancelrequested boolean NOT NULL DEFAULT false,
    cashedamount integer NOT NULL DEFAULT 0,
    commissionpercent integer NOT NULL,
    sold timestamp with time zone NOT NULL,
    errorcode smallint,
    errortext character varying(60) COLLATE pg_catalog."default",
    cancelled boolean NOT NULL DEFAULT false,
    legrunners json NOT NULL,
    meetname character varying(20) COLLATE pg_catalog."default" NOT NULL,
    meetnumber smallint NOT NULL,
    numlegs smallint NOT NULL,
    racenumber smallint NOT NULL,
    racetime timestamp with time zone NOT NULL,
    stake integer NOT NULL,
    tsn character varying(18) COLLATE pg_catalog."default",
    ticketcost integer NOT NULL,
    voidamount integer NOT NULL DEFAULT 0,
    cashed boolean NOT NULL DEFAULT false,
    CONSTRAINT placedbets_pkey PRIMARY KEY (uniqueid)
)

csutcliff avatar Oct 23 '21 14:10 csutcliff

@csutcliff - TBH, I (personally) never thought of the fact about the support of PostgreSQL to NodaTime library via this package. Glad to hear that! (It was also written by Shay Rojansky, the author of Npgsql library)

Pertaining to your issue, even we have not yet started the investigation, we have a suspicion that the issue was on the ClientTypeToNpgsqlDbTypeResolver (see here), in which has not really considered the NodaTime Instant .NET CLR Type. Fortunately, it is on our pipeline to add the PropertyValueAttributes support on the BinaryBulk<Operations> which we guess, will automatically solve this issue once introduced. I hope you can wait for that on our beta release (probably in the next 2-3 weeks).

The reason why the InsertAll and the other normal operations are working with this, was because of the complete support to the recent feature PropertyValueAttribute. The NpgsqlDbType is by default a part of it suites. As of writing this, we have tagged the portion of code where we should support it. (Thanks for leading us there)

Though, one question to @roji, is the NpgsqlBinaryImporter.Write() operation completely supporting the NodaTime without doing any tweaks?

mikependon avatar Oct 23 '21 20:10 mikependon

Though, one question to @roji, is the NpgsqlBinaryImporter.Write() operation completely supporting the NodaTime without doing any tweaks?

Yep, as far as I know there shouldn't be anything special about using NodaTime (or any other plugin) with COPY. Let me know if you see any issues!

roji avatar Oct 24 '21 11:10 roji

The development of this feature is now complete. The features are including the following.

The documentation will be coming soon. Please do report if you found an issue on the implementations.

Note: The package is still alpha and not yet on RC, nor beta.

mikependon avatar Oct 31 '21 21:10 mikependon

FYI: @stefandevo

mikependon avatar Oct 31 '21 21:10 mikependon

Hi @mikependon,

NodaTime now works I couple of issues I've encountered whilst testing this:

Firstly BinaryBulk operations dont seem to support storing CLR Enums as Int32. Works perfectly with the regular ones such as InsertAll. When using a schema/model for Int32 Enums the bulk methods throw the following exception: Can't write CLR type <EnumType> with handler type Int32Handler

Secondly, when I am testing with a compatible schema/model the Async methods are throwing various exceptions, not consistently. I've done a small amount of debugging and they mostly seem to stem from it attempting to write the wrong values into columns <Int32> A; => <text> B; for example. The exact same code works perfectly with the synchronous version. It seems to me something is getting out of sync with the Async version. Some example exceptions (there are many similar thrown with various types): Can't change NpgsqlDbType from Json to Char Can't change NpgsqlDbType from Bigint to Integer Can't change NpgsqlDbType from Integer to TimestampTz

csutcliff avatar Nov 05 '21 09:11 csutcliff

Firstly BinaryBulk operations dont seem to support storing CLR Enums as Int32.

Exactly, I think this is an edge case in which we explicitly introduced on our normal operations compilation. (See here)

Just for the fun of it, does this not work even you specify the NpgsqlTypes.Integer on the property of the model that is an Enum?

What I personally afraid is that we will be explicitly adding this logic as well on the bulk operations and even capturing the handlers (Property/Class Handlers) on this regards. 😄 But, we are happy to address this edge-case, so please do let us know.

Secondly, when I am testing with a compatible schema/model the Async methods are throwing various exceptions, not consistently. I've done a small amount of debugging and they mostly seem to stem from it attempting to write the wrong values into columns <Int32> A; => B; for example.

This should not happen as the code for Async and Sync is 100% equal, in which the only difference is the call to the underlying Async methods of the Npgsql libraries.

Can you give us more context on how we can fix this? Or, a very small project that could replicate this is good for us to simplify our investigation. The reason is I personally cannot replicate this 😄 -- but will do play around more later/soon.

mikependon avatar Nov 05 '21 13:11 mikependon

Just for the fun of it, does this not work even you specify the NpgsqlTypes.Integer on the property of the model that is an Enum?

That was the first thing I tried, no change.

csutcliff avatar Nov 05 '21 13:11 csutcliff

Can you give us more context on how we can fix this? Or, a very small project that could replicate this is good for us to simplify our investigation. The reason is I personally cannot replicate this 😄 -- but will do play around more later/soon.

I can reproduce it with the following:

using NodaTime;
using Npgsql;
using NpgsqlTypes;
using RepoDb;
using RepoDb.Attributes;
using RepoDb.Attributes.Parameter.Npgsql;

var rows = 20000;
var rand = new Random();
var now = SystemClock.Instance.GetCurrentInstant().ToUnixTimeSeconds();
for (int i = 0; i < 100; i++)
{
    try
    {
        var testRows = new BinaryBulkInsertTest[rows];
        for (int j = 0; j < rows; j++)
        {
            var randomTime = Instant.FromUnixTimeSeconds(rand.NextInt64(0, now));
            testRows[j] = new BinaryBulkInsertTest
            {
                Text = j.ToString(),
                Integer = rand.Next((int)short.MaxValue + 1, int.MaxValue),
                BigInt = rand.NextInt64((long)int.MaxValue + 1, long.MaxValue),
                SmallInt = (short)rand.Next(0, short.MaxValue),
                TimestampWithTz = randomTime,
                Date = randomTime.InUtc().Date,
                Bool = rand.Next(0, 1) == 0,
            };
        }

        NpgsqlConnection.GlobalTypeMapper.UseNodaTime();
        PostgreSqlBootstrap.Initialize();
        using var connection = new NpgsqlConnection("<Connection String>");
        //await connection.InsertAllAsync(testRows); //works consistently (slowly!)
        //connection.BinaryBulkInsert(testRows); //works consistently
        await connection.BinaryBulkInsertAsync(testRows); //random NpgsqlDbType exceptions, more frequent with bigger row arrays or looping more.
    }
    catch (Exception ex)
    {
        Console.WriteLine(ex.Message);
    }
}

[Map("binarybulkinserttest")]
public class BinaryBulkInsertTest
{
    [Map("bigintcolumn")]
    public long BigInt { get; set; }

    [Map("boolcolumn")]
    public bool Bool { get; set; } = false;

    [NpgsqlDbType(NpgsqlDbType.Date)]
    [Map("datecolumn")]
    public LocalDate Date { get; set; }

    [Primary]
    [Identity]
    [Map("idcolumn")]
    public long Id { get; set; } = 0;

    [Map("integercolumn")]
    public int Integer { get; set; }

    [Map("smallintcolumn")]
    public short SmallInt { get; set; }

    [Map("textcolumn")]
    public string Text { get; set; } = string.Empty;

    [NpgsqlDbType(NpgsqlDbType.TimestampTz)]
    [Map("timestampwithtzcolumn")]
    public Instant TimestampWithTz { get; set; }
}
CREATE TABLE IF NOT EXISTS public.binarybulkinserttest
(
    idcolumn bigint NOT NULL GENERATED ALWAYS AS IDENTITY ( INCREMENT 1 START 1 MINVALUE 1 MAXVALUE 9223372036854775807 CACHE 1 ),
    textcolumn text COLLATE pg_catalog."default" NOT NULL,
    integercolumn integer NOT NULL,
    bigintcolumn bigint NOT NULL,
    smallintcolumn smallint NOT NULL,
    timestampwithtzcolumn timestamp with time zone NOT NULL,
    datecolumn date NOT NULL,
    boolcolumn boolean NOT NULL,
    CONSTRAINT binarybulkinserttest_pkey PRIMARY KEY (idcolumn)
)

csutcliff avatar Nov 05 '21 15:11 csutcliff

The support to enum has been released. Please use the version RepoDb.PostgreSql.BulkOperations v0.0.8. Note: Please be reminded that the newly released package is still on its alpha state (not yet on beta).

mikependon avatar Dec 12 '21 22:12 mikependon