dags/marketplace_ingestion_dag.py
การติดตั้ง Airflow (DevOps/CI/CD): Freelancer ต้องมั่นใจว่า Airflow Environment บน GCP (Cloud Composer หรือ VM) ถูกตั้งค่าและสามารถติดตั้ง Dependencies ที่จำเป็นได้ การเชื่อมต่อ Airflow และ Code: ต้องกำหนดวิธีการที่ Airflow จะเข้าถึงโค้ดในโฟลเดอร์ connectors/ และ etl_transforms/ (ส่วนใหญ่มักจะใช้ Docker Image หรือ Volume Sharing) สร้างโค้ด Transform จริง: สร้างไฟล์ใน etl_transforms/data_cleaner.py เพื่อใช้ Pandas ในการทำความสะอาดและจัดรูปแบบข้อมูลให้เป็นมาตรฐานเดียวกัน ก่อนโหลดเข้า BigQueryfrom airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime, timedelta
--- 1. CONFIGURATION ---
กำหนด Argument พื้นฐานของ DAG
default_args = { 'owner': 'airflow', 'depends_on_past': False, 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), # สำหรับการดึงข้อมูลแบบ Incremental: 'start_date': datetime(2025, 12, 1), }
--- 2. IMPORT CONNECTOR & TRANSFORM SCRIPTS ---
ต้อง Import ฟังก์ชันหลักจากไฟล์ที่ Freelancer สร้างไว้ใน connectors/ และ etl_transforms/
(ต้องมั่นใจว่า path ใน Airflow สามารถเข้าถึงโค้ดเหล่านี้ได้)
สมมติว่าไฟล์ connector และ transform ถูก Import เข้ามาใน Airflow Environment แล้ว
from connectors.shopee_connector import get_shopee_orders from connectors.lazada_connector import get_lazada_orders from etl_transforms.data_cleaner import clean_and_transform from etl_transforms.bigquery_loader import load_to_bigquery # ฟังก์ชันสำหรับ Task 1.3
--- 3. DEFINE DAG ---
with DAG( dag_id='marketplace_data_ingestion_v1', default_args=default_args, description='ETL pipeline for Shopee, Lazada, and TikTok data ingestion', schedule_interval=timedelta(hours=24), # รันทุกวัน catchup=False, tags=['etl', 'data_ingestion', 'bigquery'], ) as dag:
# === TASK GROUP 1: DATA EXTRACTION (Task 1.1) ===
t1_shopee = PythonOperator(
task_id='extract_shopee_orders',
python_callable=get_shopee_orders,
# ต้องกำหนด op_kwargs เพื่อส่ง parameter (เช่น วันที่เริ่มต้น/สิ้นสุด) ไปให้ฟังก์ชัน
op_kwargs={'start_date': '{{ ds }}', 'end_date': '{{ next_ds }}'},
)
t1_lazada = PythonOperator(
task_id='extract_lazada_orders',
python_callable=get_lazada_orders,
op_kwargs={'start_date': '{{ ds }}', 'end_date': '{{ next_ds }}'},
)
# ... เพิ่ม t1_tiktok และ t1_jsterp ...
# === TASK GROUP 2: DATA TRANSFORMATION (Task 1.2) ===
t2_clean_data = PythonOperator(
task_id='clean_and_transform_raw_data',
# ฟังก์ชันนี้จะดึง Raw Data จาก GCS (ที่ Task 1.1 บันทึกไว้) มาประมวลผลด้วย Pandas
python_callable=clean_and_transform,
op_kwargs={'execution_date': '{{ ds }}'},
)
# === TASK GROUP 3: DATA LOADING (Task 1.3) ===
t3_load_to_dwh = PythonOperator(
task_id='load_transformed_data_to_bigquery',
# ฟังก์ชันนี้จะใช้ Airflow BigQuery Operator หรือโค้ด Python เพื่อโหลดเข้า BigQuery
python_callable=load_to_bigquery,
op_kwargs={'target_table': 'marketplace_orders_cleaned'},
)
# --- 4. DEFINE WORKFLOW ORDER ---
# กำหนดลำดับการทำงาน
# 1. ดึงข้อมูลทั้งหมดพร้อมกัน
[t1_shopee, t1_lazada] >> t2_clean_data
# 2. ทำความสะอาดข้อมูลเสร็จแล้วจึงโหลดเข้า DWH
t2_clean_data >> t3_load_to_dwh