dlt-meta icon indicating copy to clipboard operation
dlt-meta copied to clipboard

Bring Your Own Custom Transformation Function not triggering

Open junhonglau opened this issue 4 months ago • 2 comments

Custom function is not working

`from pyspark.sql import DataFrame from pyspark.sql.functions import lit, col, from_json, explode_outer, expr from pyspark.sql.types import StructType, ArrayType, MapType, VariantType from src.Config import Config from src.dataflow_pipeline import DataflowPipeline

def event_hub_parsing(df: DataFrame, _dataFlowSpec) -> DataFrame: source_format = _dataFlowSpec.sourceFormat if source_format == 'eventhub': df = df.withColumn("records", col("value").cast("string"))
.withColumn("parsed_records", expr("parse_json(records)"))
.withColumn("eventdate", expr("to_timestamp(parsed_records:eventdate)"))
.withColumn("eventtype", expr("CAST(parsed_records:eventtype AS STRING)"))
.withColumn('eventstatus', expr("CAST(parsed_records:eventstatus AS STRING)"))
.withColumn('eventcreationtype', expr("CAST(parsed_records:eventcreationtype AS STRING)"))
.withColumn("eh_partition", expr("CAST(partition as int)"))
.withColumn("eh_enqueued_timestamp", expr("timestamp"))
.withColumn("eh_enqueued_date", expr("to_timestamp(timestamp)"))
.withColumn("etl_processed_timestamp", current_timestamp())
.withColumn("etl_rec_uuid", expr("uuid()")) return df`

layer = spark.conf.get("layer", None) DataflowPipeline.invoke_dlt_pipeline(spark, layer, bronze_custom_transform_func=event_hub_parsing)

Final Table still without the required column in function assume the funciton is never executed Image

junhonglau avatar Sep 11 '25 13:09 junhonglau

@junhonglau added support for custom function for eventhub/kafka and delta tables in issue_227. Can you please verify if it works.

ravi-databricks avatar Sep 11 '25 15:09 ravi-databricks

it works can we merge with the main branch

junhonglau avatar Nov 20 '25 06:11 junhonglau