[BUG]
Bug: Not able to run PySpark code with Delta on k8s cluster on GCP
Which Delta project/connector is this regarding?
- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)
Describe the problem
I am attempting to execute the PySpark code, which involves reading and writing to Delta on a GCS bucket. I am using the k8s cluster and running the spark-submit command.
For normal PySpark code involving Parquet files. It is a working file but for delta, I am getting the following error. I have been trying this with multiple different versions for the last week.
File "/tmp/spark-f1f15d72-8494-4597-8d63-ccc0b33a52e4/pyspark_gcs_parquet_read.py", line 38, in <module>
extract()
File "/tmp/spark-f1f15d72-8494-4597-8d63-ccc0b33a52e4/pyspark_gcs_parquet_read.py", line 27, in extract
input_df.write.mode("overwrite").format("delta").save("/tmp/delta-table2")
File "/opt/bitnami/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 1463, in save
File "/opt/bitnami/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__
File "/opt/bitnami/spark/python/lib/pyspark.zip/pyspark/errors/exceptions/captured.py", line 179, in deco
File "/opt/bitnami/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o226.save.
: java.lang.VerifyError: Bad type on operand stack
Exception Details:
Location:
org/apache/spark/sql/delta/stats/StatisticsCollection$SqlParser$$anon$1.visitMultipartIdentifierList(Lorg/apache/spark/sql/catalyst/parser/SqlBaseParser$MultipartIdentifierListContext;)Lscala/collection/Seq; @17: invokevirtual
24/10/03 06:41:32 INFO SparkContext: SparkContext is stopping with exitCode 0.
Reason:
Type 'org/apache/spark/sql/catalyst/parser/SqlBaseParser$MultipartIdentifierListContext' (current frame, stack[1]) is not assignable to 'org/antlr/v4/runtime/ParserRuleContext'
Current Frame:
bci: @17
flags: { }
locals: { 'org/apache/spark/sql/delta/stats/StatisticsCollection$SqlParser$$anon$1', 'org/apache/spark/sql/catalyst/parser/SqlBaseParser$MultipartIdentifierListContext' }
stack: { 'org/apache/spark/sql/catalyst/parser/ParserUtils$', 'org/apache/spark/sql/catalyst/parser/SqlBaseParser$MultipartIdentifierListContext', 'scala/Option', 'scala/Function0' }
Bytecode:
0000000: b200 232b b200 23b6 0027 2a2b ba00 3f00
0000010: 00b6 0043 c000 45b0
```
I am using the following spark_submit command
spark-submit
--master k8s://<IP>
--deploy-mode cluster
--name spark-ankur-4
--packages com.google.cloud.bigdataoss:gcs-connector:hadoop3-2.2.5,io.delta:delta-spark_2.12:3.0.0
--conf spark.executor.instances=1
--conf spark.kubernetes.container.image=bitnami/spark:3.5.3
--conf spark.driver.userClassPathFirst=true
--conf spark.executor.userClassPathFirst=true
--conf spark.kubernetes.tolerations.0.key=
--conf spark.kubernetes.tolerations.0.operator=Equal
--conf spark.kubernetes.tolerations.0.value=perf
--conf spark.kubernetes.tolerations.0.effect=NoSchedule
--conf spark.hadoop.hive.metastore.uris=thrift://
--conf spark.hadoop.fs.AbstractFileSystem.gs.impl=com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS
--conf spark.hadoop.fs.gs.impl=com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem
--conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension
--conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog
--conf spark.kubernetes.driver.podTemplateFile=pod_template.yaml
--conf spark.kubernetes.executor.podTemplateFile=pod_template.yaml
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark
--conf spark.kubernetes.authenticate.executor.serviceAccountName=spark
--conf spark.kubernetes.namespace=default
--conf spark.hadoop.hadoop.security.authentication=simple
--conf spark.hadoop.hadoop.security.authorization=false
--conf spark.executorEnv.LD_PRELOAD=/opt/bitnami/common/lib/libnss_wrapper.so
--conf 'spark.driver.extraJavaOptions=-Divy.cache.dir=/tmp -Divy.home=/tmp'
gs://public-bucket-ankur/pyspark_gcs_parquet_read.py
PySpark code snippet
def extract(): # Initialize Spark session with required configurations spark = ( SparkSession.builder .appName("spark-pi") .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") .config("spark.jars", "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-2.2.11.jar") .config("spark.hadoop.fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS") .config("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") .getOrCreate() )
print(f"Spark Version {spark.sparkContext.version}")
print(f"{spark.sparkContext.getConf().getAll()}")
gcs_file_path = "gs://public-bucket-ankur/sample.snappy.parquet"
input_df = spark.read.parquet(f"{gcs_file_path}")
input_df.printSchema()
input_df.show(truncate=False)
print("Normal Parquet File completed")
input_df.write.mode("overwrite").format("delta").save("/tmp/delta-table2")
df = spark.read.format("delta").load("/tmp/delta-table2")
df.show()
### Environment information
* Delta Lake version: 3.2.1/3.0.0
* Spark version: 3.5.3/3.5.0
* Scala version: 2.12
### Willingness to contribute
The Delta Lake Community encourages bug fix contributions. Would you or another member of your organization be willing to contribute a fix for this bug to the Delta Lake code base?
- [ ] Yes. I can contribute a fix for this bug independently.
- [ ] Yes. I would be willing to contribute a fix for this bug with guidance from the Delta Lake community.
- [ ] No. I cannot contribute a bug fix at this time.