astro-sdk icon indicating copy to clipboard operation
astro-sdk copied to clipboard

Add support for Redshift DB - Pandas Path

Open kaxil opened this issue 1 year ago • 10 comments

Similar to how we currently support BigQuery and Snowflake, let's add support for Redshift too.

This issue only needs the "pandas" path so should be straightforward. Look into the Bigquery DB for example. Skip the "native"/"optimised" path for the scope of this ticket (it will be covered in #613) .

This will be part of 1.1.0b1

Acceptance Criteria:

  • Example DAG (that also run in the integration test) that loads file from S3 to Redshift
  • Tests
  • Documentation

Follow-up task: https://github.com/astronomer/astro-sdk/issues/613

Tasks breakdown:

  • [x] Research for Panda library to write to Redshift - @pankajkoti
  • [x] Override base class load_file_to_table method in a new module aws/redshift.py under databases - @pankajkoti
  • [ ] Example DAG - @pankajastro
  • [ ] Tests -> Unit Tests + Integration test calling the above example DAG - @pankajastro
  • [ ] Check for possibility of mock Redshift cluster or pause/resume Redshift for CI runs

Support following operations:

  • [x] load_file: load a given file into a SQL table
  • [ ] transform: applies a SQL select statement to a source table and saves the result to a destination table
  • [x] drop_table: drop SQL table
  • [ ] run_raw_sql: run any SQL statement without handling its output
  • [x] append: insert rows from the source SQL table into the destination SQL table, if there are no conflicts
  • [ ] merge: insert rows from the source SQL table into the destination SQL table, depending on conflicts: - [ ] ignore: do not add rows that already exist - [ ] update: replace existing rows with new ones
  • [x] export_file: export SQL table rows into a destination file
  • [x] dataframe: export given SQL table into in-memory Pandas data-frame

kaxil avatar Aug 08 '22 00:08 kaxil

@phanikumv will help in managing this and #613 so please keep him in the loop @pankajastro and @pankajkoti

Sync-up with @sunank200 or @utkarsharma2 if you are not aware about the fallback "pandas" path

kaxil avatar Aug 08 '22 00:08 kaxil

@kaxil did you mean to tag me as well in the comment? You seem to have tagged Pankaj Singh twice, but I see myself also as an assignee :)

pankajkoti avatar Aug 08 '22 05:08 pankajkoti

Yes @pankajkoti and @pankajastro will work on it together

phanikumv avatar Aug 08 '22 06:08 phanikumv

Yup :) @pankajkoti

kaxil avatar Aug 08 '22 09:08 kaxil

I will work on the implementation today. @pankajastro I will update you on the progress by today EOD and seek your help tomorrow. If you think you can construct the base example DAG assuming the underlying code will be ready, please free to do so.

pankajkoti avatar Aug 09 '22 06:08 pankajkoti

I have started with the implementation but progressing a bit slow as I am new to the project structure and overall functional implementations. I am afraid that the implementation itself might take time until Friday and then test cases will need to be implemented after that. I will try my best to complete it sooner but looks dicey at the moment since it's the first time I'm working on astro-sdk and there can be a few unknowns on the way.

pankajkoti avatar Aug 10 '22 06:08 pankajkoti

here is the example DAG to create and destroy redshift cluster

import os
from datetime import datetime, timedelta

from airflow.models.dag import DAG
from airflow.providers.amazon.aws.operators.redshift_cluster import (
    RedshiftCreateClusterOperator,
    RedshiftDeleteClusterOperator
)
from airflow.providers.amazon.aws.sensors.redshift_cluster import RedshiftClusterSensor

REDSHIFT_CONN_ID = os.getenv("ASTRO_REDSHIFT_CONN_ID", "aws_default")
REDSHIFT_CLUSTER_IDENTIFIER = os.getenv("REDSHIFT_CLUSTER_IDENTIFIER", "astro-providers-cluster")
REDSHIFT_CLUSTER_MASTER_USER = os.getenv("REDSHIFT_CLUSTER_MASTER_USER", "awsuser")
REDSHIFT_CLUSTER_MASTER_PASSWORD = os.getenv("REDSHIFT_CLUSTER_MASTER_PASSWORD", "********")
REDSHIFT_CLUSTER_TYPE = os.getenv("REDSHIFT_CLUSTER_TYPE", "single-node")
REDSHIFT_CLUSTER_NODE_TYPE = os.getenv("REDSHIFT_CLUSTER_NODE_TYPE", "dc2.large")
REDSHIFT_CLUSTER_DB_NAME = os.getenv("REDSHIFT_CLUSTER_DB_NAME", "astro_dev")
EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6))

default_args = {
    "execution_timeout": timedelta(hours=EXECUTION_TIMEOUT),
    "retries": int(os.getenv("DEFAULT_TASK_RETRIES", 2)),
    "retry_delay": timedelta(seconds=int(os.getenv("DEFAULT_RETRY_DELAY_SECONDS", 60))),
}

with DAG(
        dag_id="example_async_redshift_sql",
        start_date=datetime(2022, 1, 1),
        schedule_interval=None,
        catchup=False,
        default_args=default_args,
        tags=["example", "redshift"],
) as dag:
    create_redshift_cluster = RedshiftCreateClusterOperator(
        task_id="create_redshift_cluster",
        cluster_identifier=REDSHIFT_CLUSTER_IDENTIFIER,
        cluster_type=REDSHIFT_CLUSTER_TYPE,
        node_type=REDSHIFT_CLUSTER_NODE_TYPE,
        master_username=REDSHIFT_CLUSTER_MASTER_USER,
        master_user_password=REDSHIFT_CLUSTER_MASTER_PASSWORD,
        aws_conn_id=REDSHIFT_CONN_ID,
    )

    wait_for_cluster = RedshiftClusterSensor(
        task_id="wait_for_cluster",
        cluster_identifier=REDSHIFT_CLUSTER_IDENTIFIER,
        aws_conn_id=REDSHIFT_CONN_ID,
    )

    # Add Redshift DB here

    delete_redshift_cluster = RedshiftDeleteClusterOperator(
        task_id="delete_redshift_cluster",
        cluster_identifier=REDSHIFT_CLUSTER_IDENTIFIER,
        aws_conn_id=REDSHIFT_CONN_ID,
        trigger_rule="all_done",
    )

    (
            create_redshift_cluster
            >> wait_for_cluster
            >> delete_redshift_cluster
    )

pankajastro avatar Aug 10 '22 07:08 pankajastro

I am stuck at connecting to the Redshift cluster using the astro-sdk repo and test-connections.yaml having connection object to a redshift cluster created in DAG Authoring AWS account. Yesterday, spent about 3-4 hours to try out various configurations in the test-connections.yaml, was not successful.

Will seek help from @phanikumv , @sunank200 &/or @utkarsharma2 today based on their availability to debug this together.

Unfortunately, the story completion might be a bit delayed than the ETA.

pankajkoti avatar Aug 11 '22 06:08 pankajkoti

cc: @phanikumv @kaxil

pankajkoti avatar Aug 11 '22 07:08 pankajkoti

Yesterday: Wrote a barebone python script to check we’re able to write Pandas dataframe to Redshift. The pandas dataframe to_sql function works well Tatiana pointed in the right direction to resolve the blocker for Redshift connectivity. The http load file test works fine after implementation efforts. Works fine. Utkarsh paired up and helped here a lot. Tested load from S3 and GCP to Redshift after implementation, works fine. Ankit helped to configure GCP credentials.

Will continue to implement other operators today.

pankajkoti avatar Aug 12 '22 06:08 pankajkoti

Initial approach to use the query using psycopg2 connection similar to postgres did not run well for Redshift.

Today, I will aim to proceed by following the approach listed here: https://docs.aws.amazon.com/redshift/latest/dg/t_updating-inserting-using-staging-tables-.html

pankajkoti avatar Aug 18 '22 06:08 pankajkoti

The support for Redshift database is in for all operators except for merge_table operator which will come in a separate PR as it needs some discussion on the merge strategy due to Redshift internals.

pankajkoti avatar Aug 26 '22 14:08 pankajkoti