sagerx icon indicating copy to clipboard operation
sagerx copied to clipboard

Move from local download and PostgreSQL COPY command to a cloud-based Extract/Load workflow

Open jrlegrand opened this issue 1 month ago • 1 comments

Problem Statement

  • [ ] Set up a Google Cloud Storage (GCS) bucket.
  • [ ] Likely need to configure BigQuery access to GCS bucket - and document how to do this in the README.
  • [ ] Extract raw data to a GCS bucket instead of local /data folder.
  • [ ] Upload data from GCS bucket to BigQuery.

Criteria for Success

I would start with something simple like FDA NDC Directory DAG. Get the raw data loaded into BigQuery.

Additional Information

Airflow Providers: https://airflow.apache.org/docs/apache-airflow-providers-google/stable/index.html

BigQuery Operators: https://airflow.apache.org/docs/apache-airflow-providers-google/stable/operators/cloud/bigquery.html

Useful tutorial: https://medium.com/google-cloud/loading-and-transforming-data-into-bigquery-using-dbt-65307ad401cd

Example script:

from gcloud import storage
from oauth2client.service_account import ServiceAccountCredentials
import os


credentials_dict = {
    'type': 'service_account',
    'client_id': os.environ['BACKUP_CLIENT_ID'],
    'client_email': os.environ['BACKUP_CLIENT_EMAIL'],
    'private_key_id': os.environ['BACKUP_PRIVATE_KEY_ID'],
    'private_key': os.environ['BACKUP_PRIVATE_KEY'],
}
credentials = ServiceAccountCredentials.from_json_keyfile_dict(
    credentials_dict
)
client = storage.Client(credentials=c
from gcloud import storage
from oauth2client.service_account import ServiceAccountCredentials
import os


credentials_dict = {
    'type': 'service_account',
    'client_id': os.environ['BACKUP_CLIENT_ID'],
    'client_email': os.environ['BACKUP_CLIENT_EMAIL'],
    'private_key_id': os.environ['BACKUP_PRIVATE_KEY_ID'],
    'private_key': os.environ['BACKUP_PRIVATE_KEY'],
}
credentials = ServiceAccountCredentials.from_json_keyfile_dict(
    credentials_dict
)
client = storage.Client(credentials=c
client = storage.Client(credentials=credentials, project='myproject')
bucket = client.get_bucket('mybucket')
blob = bucket.blob('myfile')
blob.upload_from_filename('myfile')

jrlegrand avatar May 07 '24 17:05 jrlegrand