dbt_artifacts icon indicating copy to clipboard operation
dbt_artifacts copied to clipboard

fix: add casts for timestamp and boolean to resolve Spark type incompatibility

Open orentiman opened this issue 7 months ago • 2 comments

Overview

Fix INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST error when using dbt_artifacts with Spark. Added explicit cast operations for timestamp and boolean column types to ensure compatibility.

Update type - breaking / non-breaking

  • [X] Minor bug fix
  • [ ] Documentation improvements
  • [ ] Quality of Life improvements
  • [ ] New features (non-breaking change)
  • [ ] New features (breaking change)
  • [ ] Other (non-breaking change)
  • [ ] Other (breaking change)
  • [ ] Release preparation

What does this solve?

Issue 496 - [Bug]: cannot insert records due to different data types via spark adapter - string to timestamp

Outstanding questions

What databases have you tested with?

  • [ ] Snowflake
  • [ ] Google BigQuery
  • [ ] Databricks
  • [x] Spark
  • [ ] N/A

orentiman avatar May 04 '25 09:05 orentiman

Instead of creating a new spark macro for each upload_individual_datasets macros, could we create a boolean and a timestamp database_specific_helpers and use an adapter dispatch in default macros?

Something like this:

Helper macros

{% macro boolean(value) %}
    {{ return(adapter.dispatch('boolean', 'dbt_artifacts')(value)) }}
{% endmacro %}

{% macro default__boolean(value) %}
   '{{value}}'
{% endmacro %}

{% macro spark__boolean(value) %}
    cast('{{ value }}' as {{ type_boolean() }})
{% endmacro %}

Upload macros

{% macro spark__get_exposures_dml_sql(exposures) -%}

    {% if exposures != [] %}
        {% set exposure_values %}
        select
            {{ adapter.dispatch('column_identifier', 'dbt_artifacts')(1) }},
            {{ adapter.dispatch('column_identifier', 'dbt_artifacts')(2) }},
            {{ adapter.dispatch('column_identifier', 'dbt_artifacts')(3) }},
            {{ adapter.dispatch('column_identifier', 'dbt_artifacts')(4) }},
            {{ adapter.dispatch('column_identifier', 'dbt_artifacts')(5) }},
            {{ adapter.dispatch('parse_json', 'dbt_artifacts')(adapter.dispatch('column_identifier', 'dbt_artifacts')(6)) }},
            {{ adapter.dispatch('column_identifier', 'dbt_artifacts')(7) }},
            {{ adapter.dispatch('column_identifier', 'dbt_artifacts')(8) }},
            {{ adapter.dispatch('column_identifier', 'dbt_artifacts')(9) }},
            {{ adapter.dispatch('column_identifier', 'dbt_artifacts')(10) }},
            {{ adapter.dispatch('column_identifier', 'dbt_artifacts')(11) }},
            {{ adapter.dispatch('parse_json', 'dbt_artifacts')(adapter.dispatch('column_identifier', 'dbt_artifacts')(12)) }},
            {{ adapter.dispatch('parse_json', 'dbt_artifacts')(adapter.dispatch('column_identifier', 'dbt_artifacts')(13)) }},
            {{ adapter.dispatch('parse_json', 'dbt_artifacts')(adapter.dispatch('column_identifier', 'dbt_artifacts')(14)) }}
        from values
        {% for exposure in exposures -%}
            (
                '{{ invocation_id }}', {# command_invocation_id #}
                '{{ exposure.unique_id | replace("'","\\'") }}', {# node_id #}
                {{ adapter.dispatch('boolean', 'dbt_artifacts')({{ run_started_at }}) }},
                '{{ exposure.name | replace("'","\\'") }}', {# name #}
                '{{ exposure.type }}', {# type #}
                '{{ tojson(exposure.owner) }}', {# owner #}
                '{{ exposure.maturity }}', {# maturity #}
                '{{ exposure.original_file_path | replace('\\', '\\\\') }}', {# path #}
                '{{ exposure.description | replace("'","\\'") }}', {# description #}
                '{{ exposure.url }}', {# url #}
                '{{ exposure.package_name }}', {# package_name #}
                '{{ tojson(exposure.depends_on.nodes) }}', {# depends_on_nodes #}
                '{{ tojson(exposure.tags) }}', {# tags #}
                {% if var('dbt_artifacts_exclude_all_results', false) %}
                    null
                {% else %}
                    '{{ tojson(exposure) | replace("\\", "\\\\") | replace("'", "\\'") | replace('"', '\\"') }}' {# all_results #}
                {% endif %}
            )
            {%- if not loop.last %},{%- endif %}
        {%- endfor %}
        {% endset %}
        {{ exposure_values }}
    {% else %} {{ return("") }}
    {% endif %}
{% endmacro -%}

cccs-eric avatar Jun 12 '25 11:06 cccs-eric

@cccs-eric I appreciate your approach. However, I’d prefer not to connect Spark with the default configuration and instead keep the separation method consistent across all types, rather than embedding it inside the default.

orentiman avatar Jul 09 '25 19:07 orentiman