astro-sdk
astro-sdk copied to clipboard
Add support for Redshift DB - Pandas Path
Similar to how we currently support BigQuery and Snowflake, let's add support for Redshift too.
This issue only needs the "pandas" path so should be straightforward. Look into the Bigquery DB for example. Skip the "native"/"optimised" path for the scope of this ticket (it will be covered in #613) .
This will be part of 1.1.0b1
Acceptance Criteria:
- Example DAG (that also run in the integration test) that loads file from S3 to Redshift
- Tests
- Documentation
Follow-up task: https://github.com/astronomer/astro-sdk/issues/613
Tasks breakdown:
- [x] Research for Panda library to write to Redshift - @pankajkoti
- [x] Override base class load_file_to_table method in a new module
aws/redshift.py
under databases - @pankajkoti - [ ] Example DAG - @pankajastro
- [ ] Tests -> Unit Tests + Integration test calling the above example DAG - @pankajastro
- [ ] Check for possibility of mock Redshift cluster or pause/resume Redshift for CI runs
Support following operations:
- [x] load_file: load a given file into a SQL table
- [ ] transform: applies a SQL select statement to a source table and saves the result to a destination table
- [x] drop_table: drop SQL table
- [ ] run_raw_sql: run any SQL statement without handling its output
- [x] append: insert rows from the source SQL table into the destination SQL table, if there are no conflicts
- [ ] merge: insert rows from the source SQL table into the destination SQL table, depending on conflicts: - [ ] ignore: do not add rows that already exist - [ ] update: replace existing rows with new ones
- [x] export_file: export SQL table rows into a destination file
- [x] dataframe: export given SQL table into in-memory Pandas data-frame
@phanikumv will help in managing this and #613 so please keep him in the loop @pankajastro and @pankajkoti
Sync-up with @sunank200 or @utkarsharma2 if you are not aware about the fallback "pandas" path
@kaxil did you mean to tag me as well in the comment? You seem to have tagged Pankaj Singh twice, but I see myself also as an assignee :)
Yes @pankajkoti and @pankajastro will work on it together
Yup :) @pankajkoti
I will work on the implementation today. @pankajastro I will update you on the progress by today EOD and seek your help tomorrow. If you think you can construct the base example DAG assuming the underlying code will be ready, please free to do so.
I have started with the implementation but progressing a bit slow as I am new to the project structure and overall functional implementations. I am afraid that the implementation itself might take time until Friday and then test cases will need to be implemented after that. I will try my best to complete it sooner but looks dicey at the moment since it's the first time I'm working on astro-sdk and there can be a few unknowns on the way.
here is the example DAG to create and destroy redshift cluster
import os
from datetime import datetime, timedelta
from airflow.models.dag import DAG
from airflow.providers.amazon.aws.operators.redshift_cluster import (
RedshiftCreateClusterOperator,
RedshiftDeleteClusterOperator
)
from airflow.providers.amazon.aws.sensors.redshift_cluster import RedshiftClusterSensor
REDSHIFT_CONN_ID = os.getenv("ASTRO_REDSHIFT_CONN_ID", "aws_default")
REDSHIFT_CLUSTER_IDENTIFIER = os.getenv("REDSHIFT_CLUSTER_IDENTIFIER", "astro-providers-cluster")
REDSHIFT_CLUSTER_MASTER_USER = os.getenv("REDSHIFT_CLUSTER_MASTER_USER", "awsuser")
REDSHIFT_CLUSTER_MASTER_PASSWORD = os.getenv("REDSHIFT_CLUSTER_MASTER_PASSWORD", "********")
REDSHIFT_CLUSTER_TYPE = os.getenv("REDSHIFT_CLUSTER_TYPE", "single-node")
REDSHIFT_CLUSTER_NODE_TYPE = os.getenv("REDSHIFT_CLUSTER_NODE_TYPE", "dc2.large")
REDSHIFT_CLUSTER_DB_NAME = os.getenv("REDSHIFT_CLUSTER_DB_NAME", "astro_dev")
EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6))
default_args = {
"execution_timeout": timedelta(hours=EXECUTION_TIMEOUT),
"retries": int(os.getenv("DEFAULT_TASK_RETRIES", 2)),
"retry_delay": timedelta(seconds=int(os.getenv("DEFAULT_RETRY_DELAY_SECONDS", 60))),
}
with DAG(
dag_id="example_async_redshift_sql",
start_date=datetime(2022, 1, 1),
schedule_interval=None,
catchup=False,
default_args=default_args,
tags=["example", "redshift"],
) as dag:
create_redshift_cluster = RedshiftCreateClusterOperator(
task_id="create_redshift_cluster",
cluster_identifier=REDSHIFT_CLUSTER_IDENTIFIER,
cluster_type=REDSHIFT_CLUSTER_TYPE,
node_type=REDSHIFT_CLUSTER_NODE_TYPE,
master_username=REDSHIFT_CLUSTER_MASTER_USER,
master_user_password=REDSHIFT_CLUSTER_MASTER_PASSWORD,
aws_conn_id=REDSHIFT_CONN_ID,
)
wait_for_cluster = RedshiftClusterSensor(
task_id="wait_for_cluster",
cluster_identifier=REDSHIFT_CLUSTER_IDENTIFIER,
aws_conn_id=REDSHIFT_CONN_ID,
)
# Add Redshift DB here
delete_redshift_cluster = RedshiftDeleteClusterOperator(
task_id="delete_redshift_cluster",
cluster_identifier=REDSHIFT_CLUSTER_IDENTIFIER,
aws_conn_id=REDSHIFT_CONN_ID,
trigger_rule="all_done",
)
(
create_redshift_cluster
>> wait_for_cluster
>> delete_redshift_cluster
)
I am stuck at connecting to the Redshift cluster using the astro-sdk repo and test-connections.yaml having connection object to a redshift cluster created in DAG Authoring AWS account. Yesterday, spent about 3-4 hours to try out various configurations in the test-connections.yaml, was not successful.
Will seek help from @phanikumv , @sunank200 &/or @utkarsharma2 today based on their availability to debug this together.
Unfortunately, the story completion might be a bit delayed than the ETA.
cc: @phanikumv @kaxil
Yesterday: Wrote a barebone python script to check we’re able to write Pandas dataframe to Redshift. The pandas dataframe to_sql function works well Tatiana pointed in the right direction to resolve the blocker for Redshift connectivity. The http load file test works fine after implementation efforts. Works fine. Utkarsh paired up and helped here a lot. Tested load from S3 and GCP to Redshift after implementation, works fine. Ankit helped to configure GCP credentials.
Will continue to implement other operators today.
Initial approach to use the query using psycopg2 connection similar to postgres did not run well for Redshift.
Today, I will aim to proceed by following the approach listed here: https://docs.aws.amazon.com/redshift/latest/dg/t_updating-inserting-using-staging-tables-.html
The support for Redshift database is in for all operators except for merge_table operator which will come in a separate PR as it needs some discussion on the merge strategy due to Redshift internals.