spark-snowflake
spark-snowflake copied to clipboard
Predicate pushdown of COUNT fails on large table
I'm using spark-snowflake
connector version 2.9.1
with a SNAPSHOT
version of Spark compiled using the latest spark
repository commits on Java 11.0.9
, Scala 2.12.14
, and Snowflake 5.32.4
. It seems that when the spark-snowflake
connector pushes down a .count()
operation into SELECT COUNT(*)
in the actual Snowflake query, an error java.lang.IllegalArgumentException: More than Int.MaxValue elements.
can trigger. This table is 4.9 billion rows in size, which does exceed IntMax
, which is about 2.1 billion. Yet since this is a single row we're asking Snowflake to return, the number of rows should be immaterial to how the underlying computation functions in both Snowflake and Spark.
Note that if I re-write the query manually to use COUNT(*)
, the correct result is returned. In addition, if I use .persist
before .show
on the DataFrame
, a subsequent call to .count()
also works correctly. In this second case, Spark is fetching all the rows and persisting them to local disk. The subsequent call to .count
will use Spark exclusively for calculating the number of rows, rather than any predicate pushdown to Snowflake's engine. These workarounds suggest there could be a bug somewhere in the spark-snowflake
connector predicate pushdown code, rather than a bug in Spark itself.
One caveat is I'm using the most recent Spark commits & compiling them from scratch, so I'm not using a version of Spark that's been certified to work with spark-snowflake
. I've observed this error more than once over the last few months & usually work around it with a manual re-write, so I don't think any very recent commits could possibly be responsible. The stack trace also indicates the error occurs inside of a low-level Scala function scala.collection.immutable.NumericRange
, so I suspect this issue is either present in regular Spark releases or will be present in the upcoming Spark 3.2 release.
Details
The code I'm executing in the spark-shell
REPL is:
val exampleQuery = s"""
SELECT A, B, C
FROM DB.SCHEMA.MY_TABLE
WHERE D = true AND B IS NOT NULL AND C IS NOT NULL;
"""
val myDF = spark.sqlContext.read
.format(SNOWFLAKE_SOURCE_NAME)
.options(sfOptions)
.option("query", exampleQuery)
.load()
Here, myDF
is the DataFrame
we retrieve, sfOptions
contains authentication data for our Snowflake instance, and DB.SCHEMA.MY_TABLE
is a table containing 4.9 billion rows.
When using .show
on the DataFrame
, the query successfully executes in Snowflake and a few rows are shown as desired. When I check our Snowflake UI, I can see the actual Snowflake query executed is:
SELECT "A", "B", "C" FROM (SELECT A, B, C
FROM DB.SCHEMA.MY_TABLE
WHERE D = true AND B IS NOT NULL AND C IS NOT NULL)
When I call myDF.count()
, the operation gets pushed down into the Snowflake query as desired. The actual query executed on Snowflake is now:
SELECT count(*) FROM ( SELECT A, B, C
FROM DB.SCHEMA.MY_TABLE
WHERE D = true AND B IS NOT NULL AND C IS NOT NULL )
However, instead of returning 4.9 billion as the count, the command runs for a little while before erroring out with the following stacktrace:
java.lang.IllegalArgumentException: More than Int.MaxValue elements.
at scala.collection.immutable.NumericRange$.check$1(NumericRange.scala:319)
at scala.collection.immutable.NumericRange$.count(NumericRange.scala:329)
at scala.collection.immutable.NumericRange.numRangeElements$lzycompute(NumericRange.scala:54)
at scala.collection.immutable.NumericRange.numRangeElements(NumericRange.scala:53)
at scala.collection.immutable.NumericRange.length(NumericRange.scala:56)
at org.apache.spark.rdd.ParallelCollectionRDD$.slice(ParallelCollectionRDD.scala:143)
at org.apache.spark.rdd.ParallelCollectionRDD.getPartitions(ParallelCollectionRDD.scala:96)
at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:292)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:292)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:292)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:292)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:288)
at org.apache.spark.rdd.RDD.getNumPartitions(RDD.scala:308)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.mapOutputStatisticsFuture$lzycompute(ShuffleExchangeExec.scala:140)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.mapOutputStatisticsFuture(ShuffleExchangeExec.scala:139)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.$anonfun$submitShuffleJob$1(ShuffleExchangeExec.scala:68)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:222)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:219)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.submitShuffleJob(ShuffleExchangeExec.scala:68)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.submitShuffleJob$(ShuffleExchangeExec.scala:67)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.submitShuffleJob(ShuffleExchangeExec.scala:115)
at org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.shuffleFuture$lzycompute(QueryStageExec.scala:170)
at org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.shuffleFuture(QueryStageExec.scala:170)
at org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.doMaterialize(QueryStageExec.scala:172)
at org.apache.spark.sql.execution.adaptive.QueryStageExec.materialize(QueryStageExec.scala:82)
at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$5(AdaptiveSparkPlanExec.scala:256)
at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$5$adapted(AdaptiveSparkPlanExec.scala:254)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:254)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:226)
at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:365)
at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:338)
at org.apache.spark.sql.Dataset.$anonfun$count$1(Dataset.scala:3039)
at org.apache.spark.sql.Dataset.$anonfun$count$1$adapted(Dataset.scala:3038)
at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3733)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3731)
at org.apache.spark.sql.Dataset.count(Dataset.scala:3038)
... 47 elided
@dchristle I can not reproduce this issue. The count()
is pushdown to Snowflake, the final result will be a single Long
value. This stacktrace looks not related to Snowflake Spark Connector.
+1 on this issue... I have a table that Snowflake counts as 2.3 billion rows and I get this when doing df.count()
:
IllegalArgumentException: More than Int.MaxValue elements.