Etl.Net icon indicating copy to clipboard operation
Etl.Net copied to clipboard

Warehousing - copy data from one database to another

Open szum7 opened this issue 3 years ago • 15 comments

Hi, can you read data from a source db and copy it to another, target db? I've read and tried the examples in the documentation, but read and save always happen inside the same database even though the documentation mentions 2 args parameters.

Please provide example code if it's possible.

szum7 avatar Dec 20 '22 08:12 szum7

You can do it injecting a keyed EF Core context in the dependency injection. Every EF Core operator supports an extra parameter to transmit the key of the connection/db context. I will provide an example later on.

paillave avatar Dec 20 '22 14:12 paillave

provide you source cade that has an issue for me to help you at best

paillave avatar Dec 20 '22 14:12 paillave

Here's my test project. EF context has a connection string with the DataWarehouseSource1 database. processRunner.ExecuteAsync gets the other connection string to DataWarehouseTarget1. However, this reads and writes to the same db, not from source to target.

Program.cs

using Paillave.Etl.Core;
using Paillave.Etl.Reactive.Operators;
using DataWarehouseSyncV1.Context;
using Microsoft.EntityFrameworkCore;
using Paillave.Etl.EntityFrameworkCore;

namespace DataWarehouseSyncV1
{
    class Program
    {
        static async Task Main(string[] args)
        {
            var target = @"Data Source=(localdb)\mssqllocaldb;Initial Catalog=DataWarehouseTarget1;Integrated Security=True;MultipleActiveResultSets=True;";

            var processRunner = StreamProcessRunner.Create<string>(DefineProcess);
            using (var dbCtx = new WarehouseContext())
            {
                var executionOptions = new ExecutionOptions<string>
                {
                    Resolver = new SimpleDependencyResolver().Register<DbContext>(dbCtx),
                };
                var res = await processRunner.ExecuteAsync(target, executionOptions);
            }
        }

        private static void DefineProcess(ISingleStream<string> contextStream)
        {
            contextStream
                .EfCoreSelectSingle("get data", (o, row) => o
                    .Set<SimpleTable>())
                .EfCoreSave("save data")
                .Do("show value on console", i => Console.WriteLine(i.FirstName));
        }
    }
}

WarehouseContext.cs

using Microsoft.EntityFrameworkCore;

namespace DataWarehouseSyncV1.Context
{
    class WarehouseContext : DbContext
    {
        public DbSet<SimpleTable> SimpleTables { get; set; }

        protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
        {
            optionsBuilder.UseSqlServer(@"Data Source=(localdb)\mssqllocaldb;Initial Catalog=DataWarehouseSource1;Integrated Security=True;MultipleActiveResultSets=True;");
        }
    }
}

SimpleTable.cs

using System.ComponentModel.DataAnnotations;

namespace DataWarehouseSyncV1.Context
{
    public class SimpleTable
    {
        [Key]
        public int Id { get; set; }
        public string? FirstName { get; set; }
        public string? LastName { get; set; }
        public int? Age { get; set; }
    }
}

szum7 avatar Dec 20 '22 15:12 szum7

like I mentionned in my first answer, you must inject your DbContexts by keying them: example of injection of DbContexts

            var processRunner = StreamProcessRunner.Create<string[]>(TestImport2.Import);
            var executionOptions = new ExecutionOptions<string[]>
            {
                Resolver = new SimpleDependencyResolver()
                    .Register(new DataAccess.TestDbContext1(), "CNX1")
                    .Register(new DataAccess.TestDbContext2(), "CNX2")
            };
            var res = await processRunner.ExecuteAsync(args, executionOptions);

then, in your stream, you must refer to the right keyed one using WithKeyedConnection like on the following example:

                .EfCoreSave("Save composition", o => o
                    .WithKeyedConnection("CNX1")
                    .Entity(i => i.Composition)
                    .SeekOn(i => new { i.Date, i.PortfolioId })
                    .DoNotUpdateIfExists()
                    .Output((i, e) => new
                    {
                        i.Portfolio,
                        Composition = e
                    }));

paillave avatar Dec 20 '22 16:12 paillave

If you cannot have your code sample working with my precisions, I will find time to rehandle your code working asap.

paillave avatar Dec 20 '22 17:12 paillave

Based on your answer I've managed to make the "read from Native SQL db1 and write to Native SQL db2" solution work. But I'm still unable to do it with Entity Framework. Tried

  • read from EF Core db1 and write to Native SQL db2
  • read from EF Core db1 and write to EF Core db2

But none of these two worked for me. Could you please help and give me a working example? Easiest to understand would be if you reworked my very minimalistic solution from 12/20/2022.

For reference, here's the working "read from Native SQL db1 and write to Native SQL db2" solution:

using Paillave.Etl.Core;
using Paillave.Etl.SqlServer;
using System.Data.SqlClient;
using Paillave.Etl.Reactive.Operators;
using DataWarehouseSyncV1.Context;

namespace SolutionNativeSqlNamespace
{
    class SolutionNativeSql
    {
        public static async Task Run()
        {
            var source = @"Data Source=(localdb)\mssqllocaldb;Initial Catalog=DataWarehouseSource1;Integrated Security=True;MultipleActiveResultSets=True;";
            var target = @"Data Source=(localdb)\mssqllocaldb;Initial Catalog=DataWarehouseTarget1;Integrated Security=True;MultipleActiveResultSets=True;";

            var processRunner = StreamProcessRunner.Create<string>(DefineProcessSimpleTable);

            using (var sourceConnection = new SqlConnection(source))
            using (var targetConnection = new SqlConnection(target))
            {
                sourceConnection.Open();
                targetConnection.Open();
                var executionOptions = new ExecutionOptions<string> 
                { 
                    Resolver = new SimpleDependencyResolver()
                        .Register(sourceConnection, "SourceDbKey") 
                        .Register(targetConnection, "TargetDbKey") 
                };

                var res = await processRunner.ExecuteAsync(target, executionOptions);
                Console.Write(res.Failed ? "Failed" : "Succeeded");
                if (res.Failed)
                    Console.Write($"{res.ErrorTraceEvent.NodeName}({res.ErrorTraceEvent.NodeTypeName}):{res.ErrorTraceEvent.Content.Message}");
            }
        }

        private static void DefineProcessSimpleTable(ISingleStream<string> contextStream)
        {
            contextStream
                .CrossApplySqlServerQuery("Get data", o => o
                    .WithKeyedConnection("SourceDbKey")
                    .FromQuery("SELECT * FROM dbo.Persons")
                    .WithMapping<Person>())
                    .SqlServerSave("SqlServerSave", o => o
                        .WithConnection("TargetDbKey")
                        .ToTable("dbo.Persons")
                        .DoNotSave(p => p.Id))
                        .Do("Show values on console", i => Console.WriteLine($"{i.FirstName} ({i.LastName})"));
        }
    }
}

(Sorry for replying so late, I was unavailable due to the holidays.)

szum7 avatar Jan 02 '23 15:01 szum7

For entity framework read/write to work, you must inject the related DbContext(s)

paillave avatar Jan 02 '23 18:01 paillave

Yes, I did that already, I've injected the related DbContexts. Here's my code for reference:

using Microsoft.EntityFrameworkCore;
using Paillave.Etl.Core;
using Paillave.Etl.Reactive.Operators;
using Paillave.Etl.EntityFrameworkCore;

namespace SolutionEFToEFNamespace
{
    class Constants
    {
        public static readonly string SOURCE_CONNECTIONSTRING = @"Data Source=(localdb)\mssqllocaldb;Initial Catalog=DataWarehouseSource1;Integrated Security=True;MultipleActiveResultSets=True;";
        public static readonly string TARGET_CONNECTIONSTRING = @"Data Source=(localdb)\mssqllocaldb;Initial Catalog=DataWarehouseTarget1;Integrated Security=True;MultipleActiveResultSets=True;";
    }

    public class Person
    {
        public int Id { get; set; }
        public string? FirstName { get; set; }
        public string? LastName { get; set; }
        public int? Age { get; set; }
        public override string ToString() => $"{Id}, {FirstName}, {LastName}, {Age}";
    }

    class DbContextSource : DbContext
    {
        public DbSet<Person> Persons { get; set; }

        protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
        {
            optionsBuilder.UseSqlServer(Constants.SOURCE_CONNECTIONSTRING);
        }
    }

    class DbContextTarget : DbContext
    {
        public DbSet<Person> Persons { get; set; }

        protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
        {
            optionsBuilder.UseSqlServer(Constants.TARGET_CONNECTIONSTRING);
        }
    }

    class SolutionEFToEF
    {
        public static async Task Run()
        {
            var processRunner = StreamProcessRunner.Create<string>(DefineProcess);
            using (var contextSource = new DbContextSource())
            using (var contextTarget = new DbContextTarget())
            {
                var executionOptions = new ExecutionOptions<string>
                {
                    Resolver = new SimpleDependencyResolver()
                        .Register(contextSource)
                        //.Register(contextSource, "DbCtxSource") // I cannot use this, because then not even the read works
                        .Register(contextTarget, "DbCtxTarget"),
                };
                var res = await processRunner.ExecuteAsync(Constants.TARGET_CONNECTIONSTRING, executionOptions);
            }
        }

        private static void DefineProcess(ISingleStream<string> contextStream)
        {
            contextStream
                .EfCoreSelect("get data", (o, row) => o
                    // I cannot specify a connection here, no .WithKeyedConnection or such method as far as I know
                    .Set<Person>()
                    .Select(x => new Person
                    {
                        Id = x.Id,
                        FirstName = x.FirstName,
                        LastName = x.LastName,
                        Age = x.Age
                    })
                )
                .EfCoreSave("save data", o => o
                    .WithKeyedConnection("DbCtxTarget") // doesn't seem to do anything, doesn't connect to the target db
                    .Entity(i => new Person { FirstName = i.FirstName, LastName = i.LastName, Age = i.Age })
                    //.WithMode(SaveMode.EntityFrameworkCore) // tried this also, didn't work
                )
                .Do("show value on console", i => Console.WriteLine(i.ToString()));
        }
    }
}

Program.cs goes

using SolutionEFToEFNamespace;

namespace DataWarehouseSyncV1
{
    class Program
    {
        static async Task Main(string[] args)
        {
            await SolutionEFToEF.Run();
        }
    }
}

I'd like to read from a source db using EF Core and write to a target db using EF Core. I'm only able to read from the source db, I still cannot write to the target db. (And also I'm able to write to the source db, which is not my goal.)

szum7 avatar Jan 03 '23 08:01 szum7

I see, I still to refactor a little bit efcore extensions for a better consistency with other extensions. Actually the connectionKey is still direct parameter of EfCoreSelect* methods. I will have to make it in the same way than for everything else and set the current way as obsolete.

https://github.com/paillave/Etl.Net/blob/master/src/Paillave.Etl.EntityFrameworkCore/EntityFrameworkCoreRead.Stream.ex.cs

paillave avatar Jan 03 '23 12:01 paillave

I'm sorry, there was a mistake from my part and the solution in my previous comment (EFCore to EFCore) does work.

The issue was that I forgot to properly attach the target database to the EF Core. Running "Add-Migration" and "Update-Database" on the target context (target database) solved the issue.

I think this Github Issue can be closed as "injecting the dbcontexts" were the correct answer.

If anything, an updated documentation on the library (https://paillave.github.io/Etl.Net/docs/intro) would be much appreciated. I couldn't find anything there which tells me how a source and target database connection comes into play. The closest I've found is the args[0] and args[1] parameters, but I'm still yet to figure out what args[0] does. https://paillave.github.io/Etl.Net/ var res = await processRunner.ExecuteAsync(args[0], executionOptions);

szum7 avatar Jan 03 '23 13:01 szum7

The first parameter of the payload is the single value that is issued by the trigger stream. See the documentation for this: https://paillave.github.io/Etl.Net/docs/recipes/useExternalData#from-trigger-stream

Like most of question issue, I'll leave this issue opened so that I can track what documentatio I still need to do.

paillave avatar Jan 03 '23 20:01 paillave

Hello, I tried to move data between two database but it didn't work, I checked the SQL server profiler and found out it reads the data, creates temp table and make insertion from that temp but no data inserted,

Note, the ID on the insertion table is Auto generated. dose that effect?

 contextStream.EfCoreSelect("Get Transaction Data"
               , (o, row) => o.Set<Transaction>()
                    .Select(t => new TransactionHeader
                    {
                       
                        AccountId = t.AccountId,
                        AuthCode = t.AuthCode,
                        CreatedBy = t.CreatedBy,
                        TransactionId = t.Id,
                        CreationDate = t.CreationDate,
                    })
               , connectionKey: "stg")
                    .EfCoreSave("Save transaction Data",
                        t => t.WithKeyedConnection("dwh")
                        .Entity(t => new TransactionHeader
                        {
                          
                            AccountId = t.AccountId,
                            AuthCode = t.AuthCode,
                            CreatedBy = t.CreatedBy,
                            TransactionId = t.Id,
                            CreationDate = t.CreationDate,
                        })
                    .SeekOn(i => new { i.AuthCode, i.CreationDate })
                    );

AhmedJoum avatar Jan 09 '23 05:01 AhmedJoum

Did you ensure that the process worked properly by checking the property Failed of the returned result of ExecuteAsync?

Hello, I tried to move data between two database but it didn't work, I checked the SQL server profiler and found out it reads the data, creates temp table and make insertion from that temp but no data inserted,

Did you check how many rows are issued for each operators? For this you have to look in the property StreamStatisticCounters of the result returned by ExecuteAsync. More details here: StreamStatisticCounters: https://paillave.github.io/Etl.Net/docs/tutorials/trackAndCheck#check-the-result Normally, you can get the amount of impacted row by logging the execution plan with SQL Server profiler. That should help you as well to track where your rows are "lost".

Note, the ID on the insertion table is Auto generated. dose that effect?

There is no problem with autogenerated ID, your code should work properly asis.

Let me know.

paillave avatar Jan 09 '23 08:01 paillave

The issue was due forirgn key constrain, I think I need to log ExceuteAsync Results in the future.

AhmedJoum avatar Jan 09 '23 12:01 AhmedJoum

The issue was due forirgn key constrain, I think I need to log ExceuteAsync Results in the future.

for a start you must ensure that the property Failed of the response is false like described here: https://paillave.github.io/Etl.Net/docs/tutorials/trackAndCheck#get-the-error-if-it-occurs

paillave avatar Jan 09 '23 14:01 paillave