dlt-meta
dlt-meta copied to clipboard
Add support for dlt.apply_changes_from_snapshot
Implementation Details: Onboarding:
- Introduce snapshot format inside onboarding file
- Introduce bronze_apply_changes_from_snapshot config keys and scd_type are mandatory fields
"bronze_apply_changes_from_snapshot":{
"keys": ["id"]
"scd_type": "1"
"track_history_column_list": []
"track_history_except_column_list":[]
}
DataflowPipeline:
- Add argument to dataflowpipeline to accept snapshot_reader_func
- snapshot_reader_func will be applied to dlt.apply_changes_from_snapshot while doing bronze write
Usage:
- Provide snapshot reader function in a notebook while invoking Dataflowpipeline:
- Introduce new method
pip install dlt-meta
import dlt
from src.dataflow_spec import BronzeDataflowSpec
def exist(path):
try:
if dbutils.fs.ls(path) is None:
return False
else:
return True
except:
return False
def next_snapshot_and_version(latest_snapshot_version, dataflow_spec):
latest_snapshot_version = latest_snapshot_version or 0
next_version = latest_snapshot_version + 1
bronze_dataflow_spec: BronzeDataflowSpec = dataflow_spec
options = bronze_dataflow_spec.readerConfigOptions
snapshot_format = bronze_dataflow_spec.sourceDetails["snapshot_format"]
snapshot_root_path = bronze_dataflow_spec.sourceDetails['path']
snapshot_path = f"{snapshot_root_path}{next_version}.csv"
if (exist(snapshot_path)):
snapshot = spark.read.format(snapshot_format).options(**options).load(snapshot_path)
return (snapshot, next_version)
else:
# No snapshot available
return None
layer = spark.conf.get("layer", None)
from src.dataflow_pipeline import DataflowPipeline
DataflowPipeline.invoke_dlt_pipeline(spark, layer, next_snapshot_and_version=next_snapshot_and_version)