astro-sdk
astro-sdk copied to clipboard
Document code from example DAGs
Instead of hard-coding the docs in the GETTING_STARTED.md, we should use the following code to extract the code from example DAGs so it doesn't go out of date.
```{literalinclude} ../../example_dags/example_amazon_s3_postgres.py
:language: python
:start-after: "# [START howto_s3_loadfile]"
:end-before: "# [END howto_s3_loadfile]"
```
where example_amazon_s3_postgres.py
can be as follows, notice the # [START howto_s3_loadfile]
and # [END howto_s3_loadfile]
marker:
import os
from datetime import datetime, timedelta
from airflow.models import DAG
from pandas import DataFrame
from astro import sql as aql
from astro.files import File
from astro.sql.table import Table
@aql.transform
def sample_create_table(input_table: Table):
return "SELECT * FROM {{input_table}} LIMIT 10"
@aql.dataframe(identifiers_as_lower=False)
def my_df_func(input_df: DataFrame):
print(input_df)
# [START howto_s3_loadfile]
s3_bucket = os.getenv("S3_BUCKET", "s3://tmp9")
dag = DAG(
dag_id="example_amazon_s3_postgres",
start_date=datetime(2019, 1, 1),
max_active_runs=3,
schedule_interval=timedelta(minutes=30),
default_args={"retries": 1}
)
with dag:
my_homes_table = aql.load_file(
input_file=File(path=f"{s3_bucket}/homes.csv"),
output_table=Table(conn_id="postgres_conn"),
)
# [END howto_s3_loadfile]
sample_table = sample_create_table(my_homes_table)
my_df_func(sample_table)
aql.cleanup()
This task involves:
- [ ] Making changes to example DAGs (add dags / tasks) in [
example_dags directory
](https://github.com/astronomer/astro-sdk/tree/main/example_dags directory) to cover all the scenarios in the GETTING_STARTED.md
https://github.com/astronomer/astro-sdk/issues/530 should be completed first
@utkarsharma2 @sunank200 Thoughts whether this is still needed or already taken care of by https://github.com/astronomer/astro-sdk/issues/584?
@kaxil I think readthedocs has this as part of Simple ETL workflow
.