spark-snowflake icon indicating copy to clipboard operation
spark-snowflake copied to clipboard

Bump from 2.9.1 causes queries to be canceled after 5 minutes

Open ottobricks opened this issue 2 years ago • 7 comments

After upgrading our EMR cluser to use Spark 3.3.0, I also had to bump the version of the Snowflake connector from 2.9.1 to 2.11.0. Queries that used to run with no problem for 30 minutes are now being automatically killed after 5 minutes. I tried:

  • altering session config with the preactions option alter session set STATEMENT_TIMEOUT_IN_SECONDS=7200.
  • setting options queryTimeout and networkTimeout in the spark.read.format("net.snowflake.spark.snowflake")... command

I'm now running out of ideas. Has anybody experienced this behavior after bumping from 2.9.1?

ottobricks avatar Dec 01 '22 13:12 ottobricks

I see that extra parameters passed as sfOptions are provided to the JDBC driver. However, the key is transformed to lower case which means that queryTimeout becomes querytimeout, which I assume is not a valid parameter, is it?

ottobricks avatar Dec 02 '22 06:12 ottobricks

@sfc-gh-mrui could you please provide some feedback on this, please?

ottobricks avatar Dec 05 '22 08:12 ottobricks

I checked our warehouse and session parameters, both have STATEMENT_TIMEOUT_IN_SECONDS way beyond 5 minutes

ottobricks avatar Dec 06 '22 10:12 ottobricks

@ottok92 Could you please check ABORT_DETACHED_QUERY ? In-progress queries are aborted 5 minutes after connectivity is lost due to abrupt termination of a session (e.g. network outage, browser termination, service interruption). https://docs.snowflake.com/en/sql-reference/parameters.html#abort-detached-query

sfc-gh-mrui avatar Dec 07 '22 17:12 sfc-gh-mrui

Hey there,

I'm facing the same issue since upgrading our databricks runtime from 10.4 to 11.3. I tried setting ABORT_DETACHED_QUERY to FALSE as you suggested, but unfortunately my queries are still being terminated after 5 minutes.

@ottok92 did you have any luck solving this?

This is the error I'm getting:

SQLException: Status of query associated with resultSet is FAILED_WITH_ERROR. Results not generated.
	at net.snowflake.client.jdbc.SFAsyncResultSet.getRealResults(SFAsyncResultSet.java:128)
	at net.snowflake.client.jdbc.SFAsyncResultSet.getMetaData(SFAsyncResultSet.java:263)
	at net.snowflake.spark.snowflake.io.StageWriter$.executeCopyIntoTable(StageWriter.scala:603)
	at net.snowflake.spark.snowflake.io.StageWriter$.writeToTableWithStagingTable(StageWriter.scala:471)
	at net.snowflake.spark.snowflake.io.StageWriter$.writeToTable(StageWriter.scala:299)
	at net.snowflake.spark.snowflake.io.StageWriter$.writeToStage(StageWriter.scala:238)
	at net.snowflake.spark.snowflake.io.package$.writeRDD(package.scala:58)
	at net.snowflake.spark.snowflake.SnowflakeWriter.save(SnowflakeWriter.scala:79)
	at net.snowflake.spark.snowflake.DefaultSource.createRelation(DefaultSource.scala:147)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:49)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:80)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:78)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:89)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.$anonfun$applyOrElse$2(QueryExecution.scala:241)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$8(SQLExecution.scala:243)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:392)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:188)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:985)
	at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:142)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:342)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.$anonfun$applyOrElse$1(QueryExecution.scala:241)
	at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$withMVTagsIfNecessary(QueryExecution.scala:226)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.applyOrElse(QueryExecution.scala:239)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.applyOrElse(QueryExecution.scala:232)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:99)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:268)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:264)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$eagerlyExecuteCommands$1(QueryExecution.scala:232)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:324)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:232)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:186)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:177)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:268)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:965)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:430)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:397)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:259)
	at $line478fce79bfe8417693f35e92881be55d103.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.loadSliceToSnowflake(command-1697735344470165:46)
	at $line478fce79bfe8417693f35e92881be55d121.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$EventImporter.loadEventToDWH(command-3262412260734238:15)
	at $line478fce79bfe8417693f35e92881be55d131.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.$anonfun$res8$1(command-1068569631969908:8)
	at $line478fce79bfe8417693f35e92881be55d131.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.$anonfun$res8$1$adapted(command-1068569631969908:1)
	at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:145)
	at scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:982)
	at scala.collection.parallel.Task.$anonfun$tryLeaf$1(Tasks.scala:53)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at scala.util.control.Breaks$$anon$1.catchBreak(Breaks.scala:67)
	at scala.collection.parallel.Task.tryLeaf(Tasks.scala:56)
	at scala.collection.parallel.Task.tryLeaf$(Tasks.scala:50)
	at scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:979)
	at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask.compute(Tasks.scala:153)
	at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask.compute$(Tasks.scala:149)
	at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:440)
	at java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189)
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)

Gitznik avatar Jan 31 '23 10:01 Gitznik

For people running into this, setting .option("internal_execute_query_in_sync_mode", "true") seems to work around the issue

bkvarda avatar Apr 24 '23 16:04 bkvarda

Facing the same issue. @bkvarda , where do you provide the option? Is is something like the following:

self.spark.read.format("snowflake")
            .options(**self.options)
            .option("query", query)
            .option("internal_execute_query_in_sync_mode", "true")
            .load() ```

aleksandraangelova avatar Jan 13 '24 11:01 aleksandraangelova