dbt_artifacts
dbt_artifacts copied to clipboard
fix: add casts for timestamp and boolean to resolve Spark type incompatibility
Overview
Fix INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST error when using dbt_artifacts with Spark.
Added explicit cast operations for timestamp and boolean column types to ensure compatibility.
Update type - breaking / non-breaking
- [X] Minor bug fix
- [ ] Documentation improvements
- [ ] Quality of Life improvements
- [ ] New features (non-breaking change)
- [ ] New features (breaking change)
- [ ] Other (non-breaking change)
- [ ] Other (breaking change)
- [ ] Release preparation
What does this solve?
Outstanding questions
What databases have you tested with?
- [ ] Snowflake
- [ ] Google BigQuery
- [ ] Databricks
- [x] Spark
- [ ] N/A
Instead of creating a new spark macro for each upload_individual_datasets macros, could we create a boolean and a timestamp database_specific_helpers and use an adapter dispatch in default macros?
Something like this:
Helper macros
{% macro boolean(value) %}
{{ return(adapter.dispatch('boolean', 'dbt_artifacts')(value)) }}
{% endmacro %}
{% macro default__boolean(value) %}
'{{value}}'
{% endmacro %}
{% macro spark__boolean(value) %}
cast('{{ value }}' as {{ type_boolean() }})
{% endmacro %}
Upload macros
{% macro spark__get_exposures_dml_sql(exposures) -%}
{% if exposures != [] %}
{% set exposure_values %}
select
{{ adapter.dispatch('column_identifier', 'dbt_artifacts')(1) }},
{{ adapter.dispatch('column_identifier', 'dbt_artifacts')(2) }},
{{ adapter.dispatch('column_identifier', 'dbt_artifacts')(3) }},
{{ adapter.dispatch('column_identifier', 'dbt_artifacts')(4) }},
{{ adapter.dispatch('column_identifier', 'dbt_artifacts')(5) }},
{{ adapter.dispatch('parse_json', 'dbt_artifacts')(adapter.dispatch('column_identifier', 'dbt_artifacts')(6)) }},
{{ adapter.dispatch('column_identifier', 'dbt_artifacts')(7) }},
{{ adapter.dispatch('column_identifier', 'dbt_artifacts')(8) }},
{{ adapter.dispatch('column_identifier', 'dbt_artifacts')(9) }},
{{ adapter.dispatch('column_identifier', 'dbt_artifacts')(10) }},
{{ adapter.dispatch('column_identifier', 'dbt_artifacts')(11) }},
{{ adapter.dispatch('parse_json', 'dbt_artifacts')(adapter.dispatch('column_identifier', 'dbt_artifacts')(12)) }},
{{ adapter.dispatch('parse_json', 'dbt_artifacts')(adapter.dispatch('column_identifier', 'dbt_artifacts')(13)) }},
{{ adapter.dispatch('parse_json', 'dbt_artifacts')(adapter.dispatch('column_identifier', 'dbt_artifacts')(14)) }}
from values
{% for exposure in exposures -%}
(
'{{ invocation_id }}', {# command_invocation_id #}
'{{ exposure.unique_id | replace("'","\\'") }}', {# node_id #}
{{ adapter.dispatch('boolean', 'dbt_artifacts')({{ run_started_at }}) }},
'{{ exposure.name | replace("'","\\'") }}', {# name #}
'{{ exposure.type }}', {# type #}
'{{ tojson(exposure.owner) }}', {# owner #}
'{{ exposure.maturity }}', {# maturity #}
'{{ exposure.original_file_path | replace('\\', '\\\\') }}', {# path #}
'{{ exposure.description | replace("'","\\'") }}', {# description #}
'{{ exposure.url }}', {# url #}
'{{ exposure.package_name }}', {# package_name #}
'{{ tojson(exposure.depends_on.nodes) }}', {# depends_on_nodes #}
'{{ tojson(exposure.tags) }}', {# tags #}
{% if var('dbt_artifacts_exclude_all_results', false) %}
null
{% else %}
'{{ tojson(exposure) | replace("\\", "\\\\") | replace("'", "\\'") | replace('"', '\\"') }}' {# all_results #}
{% endif %}
)
{%- if not loop.last %},{%- endif %}
{%- endfor %}
{% endset %}
{{ exposure_values }}
{% else %} {{ return("") }}
{% endif %}
{% endmacro -%}
@cccs-eric I appreciate your approach. However, I’d prefer not to connect Spark with the default configuration and instead keep the separation method consistent across all types, rather than embedding it inside the default.