dbt_artifacts icon indicating copy to clipboard operation
dbt_artifacts copied to clipboard

[Bug]: cannot insert records due to different data types via spark adapter - string to timestamp

Open orentiman opened this issue 7 months ago • 2 comments

Overview

we are using spark 3.5.5 with dbt on aws emr (latest, 7.8.0) via thrift server, string values are inserted to timestamp columns w/o casting.

According to ANSI Compliance, string isn't casting to a timestamp/boolean automatically. https://spark.apache.org/docs/latest/sql-ref-ansi-compliance.html

How to reproduce

at start, executed dbt_artifacts models to create empty tables:

dbt run --select dbt_artifacts

then, following conf added to dbt_project.yml file

on-run-end:
  - "{{ dbt_artifacts.upload_results(results) }}"

After executing some model, on-run-end is executed, and error is raised:

Encountered an error:
Runtime Error
  Database Error
    org.apache.hive.service.cli.HiveSQLException: Error running query: [INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST] org.apache.spark.sql.AnalysisException: [INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST] Cannot write incompatible data for the table `spark_catalog`.`p39dbt`.`model_executions`: Cannot safely cast `run_started_at` "STRING" to "TIMESTAMP".
        at org.apache.spark.sql.hive.thriftserver.HiveThriftServerErrors$.runningQueryError(HiveThriftServerErrors.scala:43)
        at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.org$apache$spark$sql$hive$thriftserver$SparkExecuteStatementOperation$$execute(SparkExecuteStatementOperation.scala:263)
        at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2$$anon$3.$anonfun$run$2(SparkExecuteStatementOperation.scala:167)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.apache.spark.sql.hive.thriftserver.SparkOperation.withLocalProperties(SparkOperation.scala:79)
        at org.apache.spark.sql.hive.thriftserver.SparkOperation.withLocalProperties$(SparkOperation.scala:63)
        at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.withLocalProperties(SparkExecuteStatementOperation.scala:41)
        at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2$$anon$3.run(SparkExecuteStatementOperation.scala:167)
        at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2$$anon$3.run(SparkExecuteStatementOperation.scala:162)
        at java.base/java.security.AccessController.doPrivileged(AccessController.java:712)
        at java.base/javax.security.auth.Subject.doAs(Subject.java:439)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1953)
        at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2.run(SparkExecuteStatementOperation.scala:176)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:840)
    Caused by: org.apache.spark.sql.AnalysisException: [INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST] Cannot write incompatible data for the table `spark_catalog`.`p39dbt`.`model_executions`: Cannot safely cast `run_started_at` "STRING" to "TIMESTAMP".
        at org.apache.spark.sql.errors.QueryCompilationErrors$.incompatibleDataToTableCannotSafelyCastError(QueryCompilationErrors.scala:2220)
        at org.apache.spark.sql.catalyst.types.DataTypeUtils$.canWrite(DataTypeUtils.scala:174)
        at org.apache.spark.sql.catalyst.analysis.TableOutputResolver$.canWrite(TableOutputResolver.scala:165)
        at org.apache.spark.sql.catalyst.analysis.TableOutputResolver$.checkField(TableOutputResolver.scala:501)
        at org.apache.spark.sql.catalyst.analysis.TableOutputResolver$.$anonfun$reorderColumnsByName$1(TableOutputResolver.scala:223)
        at scala.collection.immutable.List.flatMap(List.scala:366)
        at org.apache.spark.sql.catalyst.analysis.TableOutputResolver$.reorderColumnsByName(TableOutputResolver.scala:180)
        at org.apache.spark.sql.catalyst.analysis.TableOutputResolver$.resolveOutputColumns(TableOutputResolver.scala:66)
        at org.apache.spark.sql.execution.datasources.PreprocessTableInsertion$.org$apache$spark$sql$execution$datasources$PreprocessTableInsertion$$preprocess(rules.scala:412)
        at org.apache.spark.sql.execution.datasources.PreprocessTableInsertion$$anonfun$apply$3.applyOrElse(rules.scala:449)
        at org.apache.spark.sql.execution.datasources.PreprocessTableInsertion$$anonfun$apply$3.applyOrElse(rules.scala:440)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$2(AnalysisHelper.scala:205)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:77)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$1(AnalysisHelper.scala:205)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:359)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning(AnalysisHelper.scala:203)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning$(AnalysisHelper.scala:199)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsDownWithPruning(LogicalPlan.scala:34)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsWithPruning(AnalysisHelper.scala:134)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsWithPruning$(AnalysisHelper.scala:131)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsWithPruning(LogicalPlan.scala:34)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperators(AnalysisHelper.scala:85)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperators$(AnalysisHelper.scala:84)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:34)
        at org.apache.spark.sql.execution.datasources.PreprocessTableInsertion$.apply(rules.scala:440)
        at org.apache.spark.sql.execution.datasources.PreprocessTableInsertion$.apply(rules.scala:373)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:239)
        at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
        at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
        at scala.collection.immutable.List.foldLeft(List.scala:91)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeBatch$1(RuleExecutor.scala:236)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$6(RuleExecutor.scala:319)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor$RuleExecutionContext$.withContext(RuleExecutor.scala:368)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$5(RuleExecutor.scala:319)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$5$adapted(RuleExecutor.scala:309)
        at scala.collection.immutable.List.foreach(List.scala:431)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:309)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:195)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:191)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.executeSameContext(Analyzer.scala:318)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$execute$2(Analyzer.scala:314)
        at org.apache.spark.sql.catalyst.analysis.AnalysisContext$.withNewAnalysisContext(Analyzer.scala:231)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:314)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:260)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:182)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:108)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:182)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:285)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:366)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:284)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:93)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:219)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:277)
        at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:714)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:277)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:901)
        at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:276)
        at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:93)
        at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:90)
        at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:82)
        at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:102)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:901)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:99)
        at org.apache.spark.sql.SparkSession.$anonfun$sql$4(SparkSession.scala:692)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:901)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:683)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:714)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:745)
        at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:651)
        at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.org$apache$spark$sql$hive$thriftserver$SparkExecuteStatementOperation$$execute(SparkExecuteStatementOperation.scala:228)
        ... 16 more
    

Expected behaviour

successful to insert data into artifacts tables.

Screenshots

not relevant.

Environment

Results of running dbt --version:

Core:
  - installed: 1.9.4
  - latest:    1.9.4 - Up to date!

Plugins:
  - spark: 1.9.2 - Up to date!

Please paste the contents of your packages.yml file here:

# here you can add dbt packages.
# install them with 'dbt deps' command
packages:
  - package: dbt-labs/dbt_utils
    version: 1.3.0
  - package: metaplane/dbt_expectations
    version: [">=0.10.0", "<1.0.0"]  # Adjust version if needed
  - package: brooklyn-data/dbt_artifacts
    version: 2.9.2

orentiman avatar May 04 '25 08:05 orentiman

FYI, same for boolean type

orentiman avatar May 04 '25 09:05 orentiman

I confirm the issue as I am facing the same problem. I started to code a fix and then I found this issue (and the related PR). Is it possible to review/merge that PR?

cccs-eric avatar Jun 11 '25 23:06 cccs-eric