spark-expectations
spark-expectations copied to clipboard
[BUG] Some values are copied over from previous runs
Describe the bug A clear and concise description of what the bug is.
To Reproduce
Run
from pyspark import SparkFiles
from pyspark.sql import *
from spark_expectations.core.expectations import SparkExpectations, WrappedDataFrameWriter
spark = SparkSession.builder.master("local[4]").getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("ERROR")
#tag::global_setup[]
se_conf = {
"se_notifications_enable_email": False,
"se_notifications_email_smtp_host": "mailhost.example.com",
"se_notifications_email_smtp_port": 25,
"se_notifications_email_from": "[email protected]",
"se_notifications_email_subject": "spark expectations - data quality - notifications",
"se_notifications_on_fail": True,
"se_notifications_on_error_drop_exceeds_threshold_breach": True,
"se_notifications_on_error_drop_threshold": 15,
}
#end::gloabl_setup[]
#tag::setup_and_load[]
from spark_expectations.config.user_config import Constants as user_config
spark.sql("DROP TABLE IF EXISTS local.magic_validation")
spark.sql("DROP TABLE IF EXISTS local.dq_stats")
spark.sql(
"""
create table local.magic_validation (
product_id STRING,
table_name STRING,
rule_type STRING,
rule STRING,
column_name STRING,
expectation STRING,
action_if_failed STRING,
tag STRING,
description STRING,
enable_for_source_dq_validation BOOLEAN,
enable_for_target_dq_validation BOOLEAN,
is_active BOOLEAN,
enable_error_drop_alert BOOLEAN,
error_drop_threshold INT
)"""
)
# Reminder: addFile does not handle directories well.
rule_file = "spark_expectations_sample_rules.json"
sc.addFile(rule_file)
df = spark.read.json(SparkFiles.get(rule_file))
print(df)
df.write.option("byname", "true").mode("append").saveAsTable("local.magic_validation")
spark.read.table("local.magic_validation").show()
# Can be used to point to your desired metastore.
se_writer = WrappedDataFrameWriter().mode("append").format("iceberg")
rule_df = spark.sql("select * from local.magic_validation")
se: SparkExpectations = SparkExpectations(
rules_df=rule_df, # See if we can replace this with the DF we wrote out.
product_id="pay", # We will only apply rules matching this product id
stats_table="local.dq_stats",
stats_table_writer=se_writer,
target_and_error_table_writer=se_writer,
stats_streaming_options={user_config.se_enable_streaming: False},
)
#end::setup_and_load[]
rule_df.show(truncate=200)
#tag::run_validation_row[]
@se.with_expectations(
user_conf=se_conf,
write_to_table=False, # If set to true SE will write to the target table.
target_and_error_table_writer=se_writer,
# target_table is used to create the error table (e.g. here local.fake_table_name_error)
# and filter the rules on top of the global product filter.
target_table="local.fake_table_name",
)
def load_data():
raw_df = spark.read.csv("data/fetched/2021", header=True, inferSchema=True)
uk_df = raw_df.select("CompanyNumber", "MaleBonusPercent", "FemaleBonuspercent")
return uk_df
data = load_data()
#end::run_validation_row[]
#tag::run_validation_complex[]
@se.with_expectations(
user_conf=se_conf,
write_to_table=True, # If set to true SE will write to the target table.
target_and_error_table_writer=se_writer,
# target_table is used to create the error table (e.g. here local.fake_table_name_error)
# and filter the rules on top of the global product filter.
target_table="local.3rd_fake",
)
def load_data2():
raw_df = spark.read.csv("data/fetched/2021", header=True, inferSchema=True)
uk_df = raw_df.select("CompanyNumber", "MaleBonusPercent", "FemaleBonuspercent")
return uk_df
data = load_data2()
#end::run_validation_complex[]
spark.sql("SELECT table_name, error_percentage, * FROM local.dq_stats").show(truncate=300)
With the rules from
{"product_id": "pay", "table_name": "local.fake_table_name", "rule_type": "row_dq", "rule": "bonus_checker", "column_name": "MaleBonusPercent", "expectation": "MaleBonusPercent > FemaleBonusPercent", "action_if_failed": "drop", "tag": "", "description": "Sample rule that the male bonuses should be higher. Thankfully this fails (but could be lower base pay etc.)", "enable_for_source_dq_validation": true, "enable_for_target_dq_validation": false, "is_active": true, "enable_error_drop_alert": true, "error_drop_threshold": 1}
{"product_id": "pay", "table_name": "local.3rd_fake", "rule_type": "query_dq", "rule": "history", "column_name": "Bloop", "expectation": "(select count(*) from 3rd_fake_view) < (select input_count from local.dq_stats WHERE table_name='local.3rd_fake')", "action_if_failed": "fail", "tag": "", "description": "We should always have more records than before", "enable_for_source_dq_validation": false, "enable_for_target_dq_validation": true, "is_active": true, "enable_error_drop_alert": true, "error_drop_threshold": 1}
Have the description for the 2nd rule copied from the first
(e.g.
|local.fake_table_name| 67.51| pay|local.fake_table_name| 15267| 10306| 4961| 32.49| 32.49| 67.51| null| null| null| null|[{rule_type -> row_dq, description -> Sample rule that the male bonuses should be higher. Thankfully this fails (but could be lower base pay etc.), rule -> bonus_checker, failed_row_count -> 10306, tag -> , action_if_failed -> drop}]|[{rule_type -> row_dq, rule_name -> bonus_checker, error_drop_threshold -> 1, description -> Sample rule that the male bonuses should be higher. Thankfully this fails (but could be lower base pay etc.), error_drop_percentage -> 67.51, action_if_failed -> drop}]| {final_query_dq -> Skipped, source_agg_dq -> Skipped, final_agg_dq -> Skipped, source_query_dq -> Skipped, run_status -> Passed, row_dq -> Passed}|{row_dq_run_time -> 1.7, final_agg_dq_run_time -> 0.0, run_time -> 2.2, source_query_dq_run_time -> 0.0, source_agg_dq_run_time -> 0.0, final_query_dq_run_time -> 0.0}|{rules -> {num_dq_rules -> 1, num_row_dq_rules -> 1}, query_dq_rules -> {num_final_query_dq_rules -> 0, num_source_query_dq_rules -> 0, num_query_dq_rules -> 0}, agg_dq_rules -> {num_source_agg_dq_rules -> 0, num_agg_dq_rules -> 0, num_final_agg_dq_rules -> 0}}|pay_4d5010a0-6192-11ee-8533-afb51606381c| 2023-10-03| 2023-10-03 09:12:28|
)
Expected behavior
Have the rules description be populated not the previous rule.
Screenshots
See 1:46 of https://www.youtube.com/watch?v=bNvvPKv-dmQ
Desktop (please complete the following information):
- OS: Linux
- Browser N/A
- Version 1.0
Additional context Add any other context about the problem here.