glow icon indicating copy to clipboard operation
glow copied to clipboard

PySpark 3.2.0 support

Open Hoeze opened this issue 2 years ago • 12 comments

I just tried glow with PySpark 3.2.0:

spark.read.parquet(OUTPUT_PATH)
Py4JJavaError: An error occurred while calling o62.parquet.
: java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformAllExpressions(Lscala/PartialFunction;)Lorg/apache/spark/sql/catalyst/plans/logical/LogicalPlan;
	at io.projectglow.sql.optimizer.ReplaceExpressionsRule$.apply(hlsOptimizerRules.scala:33)
	at io.projectglow.sql.optimizer.ReplaceExpressionsRule$.apply(hlsOptimizerRules.scala:31)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:211)
	at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
	at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
	at scala.collection.immutable.List.foldLeft(List.scala:91)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:208)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:200)
	at scala.collection.immutable.List.foreach(List.scala:431)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:200)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:215)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:209)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:172)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:179)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:179)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:193)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:192)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:88)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:196)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:196)
	at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:88)
	at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:86)
	at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:78)
	at org.apache.spark.sql.Dataset$.$anonfun$ofRows$1(Dataset.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:88)
	at org.apache.spark.sql.SparkSession.baseRelationToDataFrame(SparkSession.scala:440)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:274)
	at org.apache.spark.sql.DataFrameReader.$anonfun$load$3(DataFrameReader.scala:245)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:245)
	at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:596)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:834)

My spark config:

spark = (
    SparkSession.builder
    .appName('playground')
    .config("spark.jars.packages", ",".join([
        "io.projectglow:glow-spark3_2.12:1.1.0",
    ]))
    .config("spark.sql.execution.arrow.pyspark.enabled", "true")
    .config("spark.hadoop.io.compression.codecs", "io.projectglow.sql.util.BGZFCodec")
    .getOrCreate()
)
glow.register(spark)

Hoeze avatar Oct 21 '21 13:10 Hoeze

HI @Hoeze Glow v1.1.0 is only supported with Spark 3.1, please use that version!

williambrandler avatar Oct 21 '21 14:10 williambrandler

Hi @williambrandler, thanks for the note! Are there any plans to update to v3.2?

Hoeze avatar Oct 22 '21 14:10 Hoeze

Hey @Hoeze

Databricks recently released version of the Databricks Runtime for Spark 3.1 that is 'LTS', which means Long Term Support (for 18 months). We released Glow v1.1 at that time, which is compatible with Spark 3.1 (Glow 1.0 is compatible with Spark 3.0)

Spark 3.2 was announced publicly by Databricks this week. We plan to wait on upgrading Glow until there is a Long Term Support version of Databricks for Spark 3.2

Are there specific features in Spark 3.2 you wish to leverage with Glow?

williambrandler avatar Oct 22 '21 16:10 williambrandler

I was interested in trying out Spark 3.2, especially the parquet column index support: https://issues.apache.org/jira/browse/SPARK-26345

Beside of that, I was just confused why a point-update of Spark completely broke my environment.

Hoeze avatar Oct 22 '21 16:10 Hoeze

what got broken? Please send over the errors For column indexing, would delta lake as the indexing layer over parquet? What are you using the indexes for, querying? Please provide a little more detail

Here are two PRs we could use to bump the version to Spark 3.2 https://github.com/databricks/spark-xml/pull/564 https://github.com/projectglow/glow/pull/396

williambrandler avatar Oct 22 '21 20:10 williambrandler

The problem is that I cannot read parquet files any more (see my first post). My intention was to have very fast response on joins "variantID" with parquet/PySpark.

Hoeze avatar Oct 22 '21 23:10 Hoeze

Ah got it, so the way indexing works is not quite the same as for single node tools.

The indexing is on a per partition level, not row level (unless you have a single row foreach partition). You can get queries on position down to a few seconds, and on genes down to about 10-15s by leveraging indexing in Delta Lake.

So the performance will not be as good as for single node tools (for example using bedtools). But of course bedtools only takes you so far.

Curious how indexing works for parquet in Spark 3.2, will want to test and compare to Delta Lake

williambrandler avatar Oct 29 '21 22:10 williambrandler

But we're in the process of releasing Glow v1.1.1, which will still be Spark 3.1.2. So it will take a little bit of time before we can move onto Spark 3.2

What is your query performance now for these joins?

williambrandler avatar Oct 29 '21 22:10 williambrandler

hey @Hoeze we now have everything in place to upgrade glow to Spark 3.2,

we are just waiting on Hail to upgrade also, as glow depends on Hail. I created an issue with them

https://github.com/hail-is/hail/issues/11707

williambrandler avatar Mar 29 '22 23:03 williambrandler

Thank you for the update @williambrandler, looking forward to try it!

Hoeze avatar Apr 03 '22 12:04 Hoeze

hit some more unexpected issues on the release @Hoeze but we're getting close. We are also going to press on without waiting for Hail, EMR and Dataproc to upgrade to Spark 3.2. This means the continuous integration tests will fail at the Hail on Spark 3 step, but I have manually tested that the export from hail to glow functionality still works

williambrandler avatar Apr 20 '22 18:04 williambrandler

@Hoeze glow on spark 3.2.1 is now available as a pre-release, still doing some testing but everything seems to work except exporting from hail to glow, https://github.com/projectglow/glow/releases/tag/v1.2.1

williambrandler avatar Apr 21 '22 16:04 williambrandler