spark-operator
spark-operator copied to clipboard
Delta Read/Write not working using this operator
when we are read/write delta format using this operator, we are getting this issue. I am adding the base-image for apache-spark(with some python libraries), mainApplication.py(which reads and writes into ADLS in Delta Format) and operator yaml
Using a base-image for apache-spark(with some python libraries), we could read and write into Delta. But when we use spark-operator we are getting the below issue. Let me know if you need additional info
Base-Docker Image
================
FROM apache/spark:latest
USER root
RUN mkdir -p /home/spark/.local/share/jupyter/runtime
RUN touch /home/spark/.local/share/jupyter/runtime/jupyter_cookie_secret
RUN chmod -R 777 /home/spark
RUN chmod -R 777 /opt/spark/work-dir
RUN chmod -R 777 /opt/
#COPY lib/* /opt/spark/jars/
RUN apt update && apt install -y vim
RUN apt-get update && apt-get install -y procps
RUN pip install notebook && pip install sparksql-magic
RUN pip install jupyter_scheduler
RUN pip install azure-keyvault-secrets azure-identity
RUN pip install delta-spark==3.1.0
RUN pip install delta
ENV SPARK_HOME=/opt/spark
ENV PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin
ENV PYSPARK_PYTHON=/usr/bin/python3
COPY mainApplication.py /opt/spark
COPY jupyter-main.py /opt/spark
RUN chmod -R 777 /opt/spark/work-dir
===============
mainApplication.py
===============
from pyspark.sql.functions import *
from delta import *
import datetime , time
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
import logging
consumer_logger = logging.getLogger("Pipeline_logger")
consumer_logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
consumer_logger.addHandler(handler)
consumer_logger.info("spark Reading Process Started")
spark = SparkSession.builder.appName('x') \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.config('spark.jars.packages','io.delta:delta-spark_2.12:3.1.0,org.apache.hadoop:hadoop-azure:3.3.1') \
.getOrCreate()
schema = StructType([
StructField("Name", StringType(), True),
StructField("Age", IntegerType(), True)
])
data = [("Rohit", 30), ("Ramesh", 25), ("Robin", 35)]
df = spark.createDataFrame(data, schema)
df.show()
## azure ##
# Azure Blob Storage credentials
storage_account_name = "xxxxx"
container_name = "xxx"
storage_account_key = "xxxxx+xxxxx+xxxx+xxxx=="
# Configure Spark session with ADLS configurations
spark.conf.set("fs.azure.account.auth.type.{}.dfs.core.windows.net".format(storage_account_name), "SharedKey")
spark.conf.set("fs.azure.account.key.{}.dfs.core.windows.net".format(storage_account_name), storage_account_key)
consumer_logger.info("started writing")
df.write.format("delta").mode('overwrite').save("abfss://[email protected]/arun-dir")
consumer_logger.info("done")
while True:
df.write.format("delta").mode('overwrite').save("abfss://[email protected]/arun-dir")
consumer_logger.info("done")
time.sleep(10)
continue
===========
spark-operator
===========
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
name: arun-spark-test-aks-driver-exec
labels:
app: spark
spec:
type: Python
pythonVersion: "3"
mode: cluster
image: "xxxx.xxxx.io/arun-test-delta-spark-aks:1.0.11"
imagePullPolicy: Always
mainApplicationFile: local:///opt/spark/mainApplication.py
mainClass: org.apache.spark.examples.SparkPi
sparkVersion: "3.4"
restartPolicy:
type: OnFailure
onFailureRetries: 2
onFailureRetryInterval: 10
onSubmissionFailureRetries: 5
onSubmissionFailureRetryInterval: 20
driver:
cores: 1
coreLimit: "2400m"
memory: "1024m"
labels:
version: 3.1.3
app: spark
jobid: "spark1"
serviceAccount: tvsm-cvdp-spark
volumeMounts:
- name: driver-volume
mountPath: /opt/spark/work-dir
executor:
cores: 1
instances: 2
memory: "1026m"
labels:
version: 3.1.3
app: spark
jobid: "spark1"
testing: "label"
volumes:
- name: driver-volume
persistentVolumeClaim:
claimName: spark-data-pvc-arun2
========
error log:
=======
24/04/02 12:41:13 WARN SparkSession: Cannot use io.delta.sql.DeltaSparkSessionExtension to configure session extensions.
java.lang.ClassNotFoundException: io.delta.sql.DeltaSparkSessionExtension
at java.base/java.net.URLClassLoader.findClass(Unknown Source)
at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
at java.base/java.lang.Class.forName0(Native Method)
at java.base/java.lang.Class.forName(Unknown Source)
at org.apache.spark.util.SparkClassUtils.classForName(SparkClassUtils.scala:41)
at org.apache.spark.util.SparkClassUtils.classForName$(SparkClassUtils.scala:36)
at org.apache.spark.util.Utils$.classForName(Utils.scala:94)
at org.apache.spark.sql.SparkSession$.$anonfun$applyExtensions$2(SparkSession.scala:1367)
at org.apache.spark.sql.SparkSession$.$anonfun$applyExtensions$2$adapted(SparkSession.scala:1365)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.sql.SparkSession$.org$apache$spark$sql$SparkSession$$applyExtensions(SparkSession.scala:1365)
at org.apache.spark.sql.SparkSession.<init>(SparkSession.scala:105)
at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source)
at java.base/java.lang.reflect.Constructor.newInstance(Unknown Source)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:238)
at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Unknown Source)
24/04/02 12:41:14 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir.
24/04/02 12:41:14 INFO SharedState: Warehouse path is 'file:/opt/spark/work-dir/spark-warehouse'.
Traceback (most recent call last):
File "/opt/spark/mainApplication.py", line 30, in <module>
df.show()
File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 945, in show
File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 963, in _show_string
File "/opt/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__
File "/opt/spark/python/lib/pyspark.zip/pyspark/errors/exceptions/captured.py", line 179, in deco
File "/opt/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o78.showString.
: org.apache.spark.SparkException: Cannot find catalog plugin class for catalog 'spark_catalog': org.apache.spark.sql.delta.catalog.DeltaCatalog.
at org.apache.spark.sql.errors.QueryExecutionErrors$.catalogPluginClassNotFoundForCatalogError(QueryExecutionErrors.scala:1925)
at org.apache.spark.sql.connector.catalog.Catalogs$.load(Catalogs.scala:70)
at org.apache.spark.sql.connector.catalog.CatalogManager.loadV2SessionCatalog(CatalogManager.scala:67)
at org.apache.spark.sql.connector.catalog.CatalogManager.$anonfun$v2SessionCatalog$2(CatalogManager.scala:86)
at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86)
at org.apache.spark.sql.connector.catalog.CatalogManager.$anonfun$v2SessionCatalog$1(CatalogManager.scala:86)
at scala.Option.map(Option.scala:230)
at org.apache.spark.sql.connector.catalog.CatalogManager.v2SessionCatalog(CatalogManager.scala:85)
at org.apache.spark.sql.connector.catalog.CatalogManager.catalog(CatalogManager.scala:51)
at org.apache.spark.sql.connector.catalog.CatalogManager.currentCatalog(CatalogManager.scala:122)
at org.apache.spark.sql.connector.catalog.CatalogManager.currentNamespace(CatalogManager.scala:93)
at org.apache.spark.sql.catalyst.optimizer.ReplaceCurrentLike.apply(finishAnalysis.scala:143)
at org.apache.spark.sql.catalyst.optimizer.ReplaceCurrentLike.apply(finishAnalysis.scala:140)
at org.apache.spark.sql.catalyst.optimizer.Optimizer$FinishAnalysis$.$anonfun$apply$1(Optimizer.scala:295)
at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
at scala.collection.immutable.List.foldLeft(List.scala:91)
at org.apache.spark.sql.catalyst.optimizer.Optimizer$FinishAnalysis$.apply(Optimizer.scala:295)
at org.apache.spark.sql.catalyst.optimizer.Optimizer$FinishAnalysis$.apply(Optimizer.scala:275)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:222)
at scala.collection.IndexedSeqOptimized.foldLeft(IndexedSeqOptimized.scala:60)
at scala.collection.IndexedSeqOptimized.foldLeft$(IndexedSeqOptimized.scala:68)
at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:38)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:219)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:211)
at scala.collection.immutable.List.foreach(List.scala:431)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:211)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:182)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:89)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:182)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan$1(QueryExecution.scala:152)
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:138)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:219)
at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:219)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:218)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:148)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:144)
at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:162)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:182)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:179)
at org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:238)
at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:284)
at org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:252)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:117)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4320)
at org.apache.spark.sql.Dataset.head(Dataset.scala:3314)
at org.apache.spark.sql.Dataset.take(Dataset.scala:3537)
at org.apache.spark.sql.Dataset.getRows(Dataset.scala:280)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:315)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.base/java.lang.reflect.Method.invoke(Unknown Source)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.delta.catalog.DeltaCatalog
at java.base/java.net.URLClassLoader.findClass(Unknown Source)
at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
at org.apache.spark.sql.connector.catalog.Catalogs$.load(Catalogs.scala:60)
... 65 more
24/04/02 12:41:16 INFO SparkContext: SparkContext is stopping with exitCode 0.
24/04/02 12:41:16 INFO SparkUI: Stopped Spark web UI at http://arun-spark-test-aks-driver-exec-3a764c8e9ed32083-driver-svc.spark-jobs.svc:4040
24/04/02 12:41:16 INFO KubernetesClusterSchedulerBackend: Shutting down all executors
24/04/02 12:41:16 INFO KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Asking each executor to shut down
24/04/02 12:41:16 WARN ExecutorPodsWatchSnapshotSource: Kubernetes client has been closed.
24/04/02 12:41:16 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
24/04/02 12:41:16 INFO MemoryStore: MemoryStore cleared
24/04/02 12:41:16 INFO BlockManager: BlockManager stopped
24/04/02 12:41:16 INFO BlockManagerMaster: BlockManagerMaster stopped
24/04/02 12:41:16 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
24/04/02 12:41:16 INFO SparkContext: Successfully stopped SparkContext
24/04/02 12:41:16 INFO ShutdownHookManager: Shutdown hook called
24/04/02 12:41:16 INFO ShutdownHookManager: Deleting directory /tmp/spark-95ba71e1-8475-4f57-b9a1-b25f5d4de00a
24/04/02 12:41:16 INFO ShutdownHookManager: Deleting directory /var/data/spark-738d16bb-5173-4df0-9c59-9a9741bc7aab/spark-cce6eb80-4148-4179-b333-486493cd983f/pyspark-5dee90c8-d037-404e-9d72-a27147188148
24/04/02 12:41:16 INFO ShutdownHookManager: Deleting directory /var/data/spark-738d16bb-5173-4df0-9c59-9a9741bc7aab/spark-cce6eb80-4148-4179-b333-486493cd983f