spark-snowflake
spark-snowflake copied to clipboard
Bump from 2.9.1 causes queries to be canceled after 5 minutes
After upgrading our EMR cluser to use Spark 3.3.0, I also had to bump the version of the Snowflake connector from 2.9.1 to 2.11.0.
Queries that used to run with no problem for 30 minutes are now being automatically killed after 5 minutes. I tried:
- altering session config with the
preactionsoptionalter session set STATEMENT_TIMEOUT_IN_SECONDS=7200. - setting options
queryTimeoutandnetworkTimeoutin thespark.read.format("net.snowflake.spark.snowflake")...command
I'm now running out of ideas. Has anybody experienced this behavior after bumping from 2.9.1?
I see that extra parameters passed as sfOptions are provided to the JDBC driver. However, the key is transformed to lower case which means that queryTimeout becomes querytimeout, which I assume is not a valid parameter, is it?
@sfc-gh-mrui could you please provide some feedback on this, please?
I checked our warehouse and session parameters, both have STATEMENT_TIMEOUT_IN_SECONDS way beyond 5 minutes
@ottok92 Could you please check ABORT_DETACHED_QUERY ?
In-progress queries are aborted 5 minutes after connectivity is lost due to abrupt termination of a session (e.g. network outage, browser termination, service interruption).
https://docs.snowflake.com/en/sql-reference/parameters.html#abort-detached-query
Hey there,
I'm facing the same issue since upgrading our databricks runtime from 10.4 to 11.3.
I tried setting ABORT_DETACHED_QUERY to FALSE as you suggested, but unfortunately my queries are still being terminated after 5 minutes.
@ottok92 did you have any luck solving this?
This is the error I'm getting:
SQLException: Status of query associated with resultSet is FAILED_WITH_ERROR. Results not generated.
at net.snowflake.client.jdbc.SFAsyncResultSet.getRealResults(SFAsyncResultSet.java:128)
at net.snowflake.client.jdbc.SFAsyncResultSet.getMetaData(SFAsyncResultSet.java:263)
at net.snowflake.spark.snowflake.io.StageWriter$.executeCopyIntoTable(StageWriter.scala:603)
at net.snowflake.spark.snowflake.io.StageWriter$.writeToTableWithStagingTable(StageWriter.scala:471)
at net.snowflake.spark.snowflake.io.StageWriter$.writeToTable(StageWriter.scala:299)
at net.snowflake.spark.snowflake.io.StageWriter$.writeToStage(StageWriter.scala:238)
at net.snowflake.spark.snowflake.io.package$.writeRDD(package.scala:58)
at net.snowflake.spark.snowflake.SnowflakeWriter.save(SnowflakeWriter.scala:79)
at net.snowflake.spark.snowflake.DefaultSource.createRelation(DefaultSource.scala:147)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:49)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:80)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:78)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:89)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.$anonfun$applyOrElse$2(QueryExecution.scala:241)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$8(SQLExecution.scala:243)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:392)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:188)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:985)
at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:142)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:342)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.$anonfun$applyOrElse$1(QueryExecution.scala:241)
at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$withMVTagsIfNecessary(QueryExecution.scala:226)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.applyOrElse(QueryExecution.scala:239)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.applyOrElse(QueryExecution.scala:232)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:99)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:268)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:264)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$eagerlyExecuteCommands$1(QueryExecution.scala:232)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:324)
at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:232)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:186)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:177)
at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:268)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:965)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:430)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:397)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:259)
at $line478fce79bfe8417693f35e92881be55d103.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.loadSliceToSnowflake(command-1697735344470165:46)
at $line478fce79bfe8417693f35e92881be55d121.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$EventImporter.loadEventToDWH(command-3262412260734238:15)
at $line478fce79bfe8417693f35e92881be55d131.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.$anonfun$res8$1(command-1068569631969908:8)
at $line478fce79bfe8417693f35e92881be55d131.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.$anonfun$res8$1$adapted(command-1068569631969908:1)
at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:145)
at scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:982)
at scala.collection.parallel.Task.$anonfun$tryLeaf$1(Tasks.scala:53)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at scala.util.control.Breaks$$anon$1.catchBreak(Breaks.scala:67)
at scala.collection.parallel.Task.tryLeaf(Tasks.scala:56)
at scala.collection.parallel.Task.tryLeaf$(Tasks.scala:50)
at scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:979)
at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask.compute(Tasks.scala:153)
at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask.compute$(Tasks.scala:149)
at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:440)
at java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
For people running into this, setting .option("internal_execute_query_in_sync_mode", "true") seems to work around the issue
Facing the same issue. @bkvarda , where do you provide the option? Is is something like the following:
self.spark.read.format("snowflake")
.options(**self.options)
.option("query", query)
.option("internal_execute_query_in_sync_mode", "true")
.load() ```