dlt-meta icon indicating copy to clipboard operation
dlt-meta copied to clipboard

Add support for dlt.apply_changes_from_snapshot

Open ravi-databricks opened this issue 1 year ago • 2 comments

Provide support for dlt.apply_changes_from_snapshot

ravi-databricks avatar Aug 07 '24 17:08 ravi-databricks

Implementation Details: Onboarding:

  1. Introduce snapshot format inside onboarding file
  2. Introduce bronze_apply_changes_from_snapshot config keys and scd_type are mandatory fields
"bronze_apply_changes_from_snapshot":{
      "keys": ["id"] 
      "scd_type": "1"
      "track_history_column_list": []
      "track_history_except_column_list":[]
}

DataflowPipeline:

  1. Add argument to dataflowpipeline to accept snapshot_reader_func
  2. snapshot_reader_func will be applied to dlt.apply_changes_from_snapshot while doing bronze write

Usage:

  1. Provide snapshot reader function in a notebook while invoking Dataflowpipeline:
  2. Introduce new method
pip install dlt-meta
import dlt
from src.dataflow_spec import BronzeDataflowSpec

def exist(path):
    try:
        if dbutils.fs.ls(path) is None:
            return False
        else:
            return True
    except:
        return False


def next_snapshot_and_version(latest_snapshot_version, dataflow_spec):
    latest_snapshot_version = latest_snapshot_version or 0
    next_version = latest_snapshot_version + 1    
    bronze_dataflow_spec: BronzeDataflowSpec = dataflow_spec
    options = bronze_dataflow_spec.readerConfigOptions
    snapshot_format =  bronze_dataflow_spec.sourceDetails["snapshot_format"]
    snapshot_root_path = bronze_dataflow_spec.sourceDetails['path']    
    snapshot_path = f"{snapshot_root_path}{next_version}.csv"
    if (exist(snapshot_path)):
        snapshot = spark.read.format(snapshot_format).options(**options).load(snapshot_path)
        return (snapshot, next_version)
    else:
        # No snapshot available
        return None 


layer = spark.conf.get("layer", None)
from src.dataflow_pipeline import DataflowPipeline
DataflowPipeline.invoke_dlt_pipeline(spark, layer, next_snapshot_and_version=next_snapshot_and_version) 

ravi-databricks avatar Sep 27 '24 18:09 ravi-databricks