spline-spark-agent icon indicating copy to clipboard operation
spline-spark-agent copied to clipboard

SplineQueryExecutionListener: Unexpected error occurred during lineage processing for application

Open Taurus-Le opened this issue 2 years ago • 11 comments

Version:

  • JDK: 11.0.10
  • Scala: 2.12.12
  • Hadoop: 3.2.2
  • Spark: 3.2.0-bin-hadoop3.2
  • spline: release/0.7.7
  • spline-ui: release/0.7.4
  • spline-spark-agent: release/0.7.10
  • Delta Lake: 1.2.0
  • Elasticsearch: 7.13.4

Description: The app is reading from multiple delta lake tables and writing the output of join as a new delta lake table. Spline successfully initialized. But unexpected error occurred.

Error logs: delta2delta-1.log delta2delta-2.log delta2delta-3.log

Error information:

  • Delta2Delta
22/07/20 15:16:38 ERROR LineageHarvester: Write extraction failed for: class org.apache.spark.sql.catalyst.plans.logical.ReplaceTableAsSelect
22/07/20 15:16:39 ERROR LineageHarvester: 
****************************** OBJECT DUMP BEGIN ******************************
class org.apache.spark.sql.catalyst.plans.logical.ReplaceTableAsSelect
  resolved: boolean = true
  catalog: org.apache.spark.sql.connector.catalog.TableCatalog 
    BestEffortStagedTable$module: org.apache.spark.sql.delta.catalog.DeltaCatalog$BestEffortStagedTable$ = null
    spark: org.apache.spark.sql.SparkSession 
      implicits$module: org.apache.spark.sql.SparkSession$implicits$ 
        $outer: org.apache.spark.sql.SparkSession ! Object was already logged
      org$apache$spark$sql$SparkSession$$creationSite: org.apache.spark.util.CallSite 
        shortForm: java.lang.String = getOrCreate at SparkEnvironment.scala:30
        longForm: java.lang.String = org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:958)
io.github.interestinglab.waterdrop.spark.SparkEnvironment.prepare(SparkEnvironment.scala:30)
io.github.interestinglab.waterdrop.spark.SparkEnvironment.prepare(SparkEnvironment.scala:14)
io.github.interestinglab.waterdrop.config.ConfigBuilder.createEnv(ConfigBuilder.java:193)
io.github.interestinglab.waterdrop.config.ConfigBuilder.<init>(ConfigBuilder.java:39)
io.github.interestinglab.waterdrop.Waterdrop.entryPoint(Waterdrop.java:92)
io.github.interestinglab.waterdrop.Waterdrop.run(Waterdrop.java:58)
io.github.interestinglab.waterdrop.Waterdrop.main(Waterdrop.java:42)
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.base/java.lang.reflect.Method.invoke(Method.java:566)

*****omit for length limit

****************************** OBJECT DUMP END   ******************************
22/07/20 15:16:39 ERROR SplineQueryExecutionListener: Unexpected error occurred during lineage processing for application: dim_t_device_mapping_test #application_1658293063172_0008
java.lang.reflect.InvocationTargetException
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at scala.reflect.runtime.JavaMirrors$JavaMirror$JavaVanillaMethodMirror0.jinvokeraw(JavaMirrors.scala:404)
	at scala.reflect.runtime.JavaMirrors$JavaMirror$JavaMethodMirror.jinvoke(JavaMirrors.scala:380)
	at scala.reflect.runtime.JavaMirrors$JavaMirror$JavaVanillaMethodMirror.apply(JavaMirrors.scala:396)
	at za.co.absa.commons.reflect.ReflectionUtils$.$anonfun$extractFieldValue$5(ReflectionUtils.scala:143)
	at scala.Option.map(Option.scala:230)
	at za.co.absa.commons.reflect.ReflectionUtils$.reflectClass$1(ReflectionUtils.scala:141)
	at za.co.absa.commons.reflect.ReflectionUtils$.reflectClassHierarchy$1(ReflectionUtils.scala:112)
	at za.co.absa.commons.reflect.ReflectionUtils$.extractFieldValue(ReflectionUtils.scala:169)
	at za.co.absa.commons.reflect.ReflectionUtils$.extractFieldValue(ReflectionUtils.scala:184)
	at za.co.absa.spline.harvester.plugin.embedded.DataSourceV2Plugin.extractSourceIdFromDeltaTableV2(DataSourceV2Plugin.scala:154)
	at za.co.absa.spline.harvester.plugin.embedded.DataSourceV2Plugin.za$co$absa$spline$harvester$plugin$embedded$DataSourceV2Plugin$$extractSourceIdFromTable(DataSourceV2Plugin.scala:150)
	at za.co.absa.spline.harvester.plugin.embedded.DataSourceV2Plugin.za$co$absa$spline$harvester$plugin$embedded$DataSourceV2Plugin$$processV2CreateTableCommand(DataSourceV2Plugin.scala:108)
	at za.co.absa.spline.harvester.plugin.embedded.DataSourceV2Plugin$$anonfun$2.applyOrElse(DataSourceV2Plugin.scala:77)
	at za.co.absa.spline.harvester.plugin.embedded.DataSourceV2Plugin$$anonfun$2.applyOrElse(DataSourceV2Plugin.scala:52)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
	at scala.PartialFunction$Lifted.apply(PartialFunction.scala:228)
	at scala.PartialFunction$Lifted.apply(PartialFunction.scala:224)
	at za.co.absa.spline.harvester.builder.write.PluggableWriteCommandExtractor.asWriteCommand(PluggableWriteCommandExtractor.scala:44)
	at za.co.absa.spline.harvester.LineageHarvester.$anonfun$tryExtractWriteCommand$1(LineageHarvester.scala:145)
	at scala.util.Try$.apply(Try.scala:213)
	at za.co.absa.spline.harvester.LineageHarvester.tryExtractWriteCommand(LineageHarvester.scala:145)
	at za.co.absa.spline.harvester.LineageHarvester.harvest(LineageHarvester.scala:64)
	at za.co.absa.spline.harvester.QueryExecutionEventHandler.onSuccess(QueryExecutionEventHandler.scala:42)
	at za.co.absa.spline.harvester.listener.SplineQueryExecutionListener.$anonfun$onSuccess$2(SplineQueryExecutionListener.scala:40)
	at za.co.absa.spline.harvester.listener.SplineQueryExecutionListener.$anonfun$onSuccess$2$adapted(SplineQueryExecutionListener.scala:40)
	at scala.Option.foreach(Option.scala:407)
	at za.co.absa.spline.harvester.listener.SplineQueryExecutionListener.$anonfun$onSuccess$1(SplineQueryExecutionListener.scala:40)
	at za.co.absa.spline.harvester.listener.SplineQueryExecutionListener.withErrorHandling(SplineQueryExecutionListener.scala:49)
	at za.co.absa.spline.harvester.listener.SplineQueryExecutionListener.onSuccess(SplineQueryExecutionListener.scala:40)
	at org.apache.spark.sql.util.ExecutionListenerBus.doPostEvent(QueryExecutionListener.scala:158)
	at org.apache.spark.sql.util.ExecutionListenerBus.doPostEvent(QueryExecutionListener.scala:128)
	at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117)
	at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101)
	at org.apache.spark.sql.util.ExecutionListenerBus.postToAll(QueryExecutionListener.scala:128)
	at org.apache.spark.sql.util.ExecutionListenerBus.onOtherEvent(QueryExecutionListener.scala:140)
	at org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:100)
	at org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28)
	at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
	at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
	at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117)
	at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101)
	at org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:105)
	at org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:105)
	at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
	at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:100)
	at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:96)
	at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1404)
	at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:96)
Caused by: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
This stopped SparkContext was created at:

org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:943)
io.github.interestinglab.waterdrop.spark.SparkEnvironment.prepare(SparkEnvironment.scala:30)
io.github.interestinglab.waterdrop.spark.SparkEnvironment.prepare(SparkEnvironment.scala:14)
io.github.interestinglab.waterdrop.config.ConfigBuilder.createEnv(ConfigBuilder.java:193)
io.github.interestinglab.waterdrop.config.ConfigBuilder.<init>(ConfigBuilder.java:39)
io.github.interestinglab.waterdrop.Waterdrop.entryPoint(Waterdrop.java:92)
io.github.interestinglab.waterdrop.Waterdrop.run(Waterdrop.java:58)
io.github.interestinglab.waterdrop.Waterdrop.main(Waterdrop.java:42)
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.base/java.lang.reflect.Method.invoke(Method.java:566)
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:737)

The currently active SparkContext was created at:

org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:943)
io.github.interestinglab.waterdrop.spark.SparkEnvironment.prepare(SparkEnvironment.scala:30)
io.github.interestinglab.waterdrop.spark.SparkEnvironment.prepare(SparkEnvironment.scala:14)
io.github.interestinglab.waterdrop.config.ConfigBuilder.createEnv(ConfigBuilder.java:193)
io.github.interestinglab.waterdrop.config.ConfigBuilder.<init>(ConfigBuilder.java:39)
io.github.interestinglab.waterdrop.Waterdrop.entryPoint(Waterdrop.java:92)
io.github.interestinglab.waterdrop.Waterdrop.run(Waterdrop.java:58)
io.github.interestinglab.waterdrop.Waterdrop.main(Waterdrop.java:42)
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.base/java.lang.reflect.Method.invoke(Method.java:566)
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:737)
         
	at org.apache.spark.SparkContext.assertNotStopped(SparkContext.scala:118)
	at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1512)
	at org.apache.spark.sql.delta.Snapshot.$anonfun$stateReconstruction$2(Snapshot.scala:107)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:120)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:118)
	at org.apache.spark.sql.delta.Snapshot.recordFrameProfile(Snapshot.scala:58)
	at org.apache.spark.sql.delta.Snapshot.$anonfun$stateReconstruction$1(Snapshot.scala:98)
	at org.apache.spark.sql.delta.metering.DeltaLogging.withDmqTag(DeltaLogging.scala:124)
	at org.apache.spark.sql.delta.metering.DeltaLogging.withDmqTag$(DeltaLogging.scala:123)
	at org.apache.spark.sql.delta.Snapshot.withDmqTag(Snapshot.scala:58)
	at org.apache.spark.sql.delta.Snapshot.stateReconstruction(Snapshot.scala:98)
	at org.apache.spark.sql.delta.Snapshot.$anonfun$cachedState$1(Snapshot.scala:154)
	at org.apache.spark.sql.delta.metering.DeltaLogging.withDmqTag(DeltaLogging.scala:124)
	at org.apache.spark.sql.delta.metering.DeltaLogging.withDmqTag$(DeltaLogging.scala:123)
	at org.apache.spark.sql.delta.Snapshot.withDmqTag(Snapshot.scala:58)
	at org.apache.spark.sql.delta.Snapshot.cachedState$lzycompute(Snapshot.scala:154)
	at org.apache.spark.sql.delta.Snapshot.cachedState(Snapshot.scala:153)
	at org.apache.spark.sql.delta.Snapshot.$anonfun$stateDF$1(Snapshot.scala:164)
	at org.apache.spark.sql.delta.metering.DeltaLogging.withDmqTag(DeltaLogging.scala:124)
	at org.apache.spark.sql.delta.metering.DeltaLogging.withDmqTag$(DeltaLogging.scala:123)
	at org.apache.spark.sql.delta.Snapshot.withDmqTag(Snapshot.scala:58)
	at org.apache.spark.sql.delta.Snapshot.stateDF(Snapshot.scala:164)
	at org.apache.spark.sql.delta.Snapshot.$anonfun$computedState$3(Snapshot.scala:209)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:120)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:118)
	at org.apache.spark.sql.delta.Snapshot.recordFrameProfile(Snapshot.scala:58)
	at org.apache.spark.sql.delta.Snapshot.$anonfun$computedState$2(Snapshot.scala:205)
	at org.apache.spark.sql.delta.metering.DeltaLogging.withDmqTag(DeltaLogging.scala:124)
	at org.apache.spark.sql.delta.metering.DeltaLogging.withDmqTag$(DeltaLogging.scala:123)
	at org.apache.spark.sql.delta.Snapshot.withDmqTag(Snapshot.scala:58)
	at org.apache.spark.sql.delta.Snapshot.$anonfun$computedState$1(Snapshot.scala:205)
	at org.apache.spark.sql.delta.util.DeltaProgressReporter.withJobDescription(DeltaProgressReporter.scala:53)
	at org.apache.spark.sql.delta.util.DeltaProgressReporter.withStatusCode(DeltaProgressReporter.scala:32)
	at org.apache.spark.sql.delta.util.DeltaProgressReporter.withStatusCode$(DeltaProgressReporter.scala:27)
	at org.apache.spark.sql.delta.Snapshot.withStatusCode(Snapshot.scala:58)
	at org.apache.spark.sql.delta.Snapshot.computedState$lzycompute(Snapshot.scala:204)
	at org.apache.spark.sql.delta.Snapshot.computedState(Snapshot.scala:202)
	at org.apache.spark.sql.delta.Snapshot.metadata(Snapshot.scala:242)
	at org.apache.spark.sql.delta.stats.DataSkippingReaderBase.$init$(DataSkippingReader.scala:174)
	at org.apache.spark.sql.delta.Snapshot.<init>(Snapshot.scala:67)
	at org.apache.spark.sql.delta.SnapshotManagement.$anonfun$createSnapshot$1(SnapshotManagement.scala:271)
	at org.apache.spark.sql.delta.SnapshotManagement.createSnapshotFromGivenOrEquivalentLogSegment(SnapshotManagement.scala:390)
	at org.apache.spark.sql.delta.SnapshotManagement.createSnapshotFromGivenOrEquivalentLogSegment$(SnapshotManagement.scala:378)
	at org.apache.spark.sql.delta.DeltaLog.createSnapshotFromGivenOrEquivalentLogSegment(DeltaLog.scala:64)
	at org.apache.spark.sql.delta.SnapshotManagement.createSnapshot(SnapshotManagement.scala:270)
	at org.apache.spark.sql.delta.SnapshotManagement.createSnapshot$(SnapshotManagement.scala:266)
	at org.apache.spark.sql.delta.DeltaLog.createSnapshot(DeltaLog.scala:64)
	at org.apache.spark.sql.delta.SnapshotManagement.$anonfun$getSnapshotAtInit$2(SnapshotManagement.scala:252)
	at scala.Option.map(Option.scala:230)
	at org.apache.spark.sql.delta.SnapshotManagement.$anonfun$getSnapshotAtInit$1(SnapshotManagement.scala:248)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:120)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:118)
	at org.apache.spark.sql.delta.DeltaLog.recordFrameProfile(DeltaLog.scala:64)
	at org.apache.spark.sql.delta.SnapshotManagement.getSnapshotAtInit(SnapshotManagement.scala:246)
	at org.apache.spark.sql.delta.SnapshotManagement.getSnapshotAtInit$(SnapshotManagement.scala:245)
	at org.apache.spark.sql.delta.DeltaLog.getSnapshotAtInit(DeltaLog.scala:64)
	at org.apache.spark.sql.delta.SnapshotManagement.$init$(SnapshotManagement.scala:53)
	at org.apache.spark.sql.delta.DeltaLog.<init>(DeltaLog.scala:69)
	at org.apache.spark.sql.delta.DeltaLog$.$anonfun$apply$3(DeltaLog.scala:567)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
	at org.apache.spark.sql.delta.DeltaLog$.$anonfun$apply$2(DeltaLog.scala:567)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:120)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:118)
	at org.apache.spark.sql.delta.DeltaLog$.recordFrameProfile(DeltaLog.scala:437)
	at org.apache.spark.sql.delta.metering.DeltaLogging.$anonfun$recordDeltaOperation$5(DeltaLogging.scala:114)
	at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:77)
	at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:67)
	at org.apache.spark.sql.delta.DeltaLog$.recordOperation(DeltaLog.scala:437)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:113)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:98)
	at org.apache.spark.sql.delta.DeltaLog$.recordDeltaOperation(DeltaLog.scala:437)
	at org.apache.spark.sql.delta.DeltaLog$.createDeltaLog$1(DeltaLog.scala:566)
	at org.apache.spark.sql.delta.DeltaLog$.$anonfun$apply$4(DeltaLog.scala:577)
	at com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4792)
	at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
	at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
	at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
	at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2257)
	at com.google.common.cache.LocalCache.get(LocalCache.java:4000)
	at com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4789)
	at org.apache.spark.sql.delta.DeltaLog$.getDeltaLogFromCache$1(DeltaLog.scala:577)
	at org.apache.spark.sql.delta.DeltaLog$.apply(DeltaLog.scala:589)
	at org.apache.spark.sql.delta.DeltaLog$.forTable(DeltaLog.scala:487)
	at org.apache.spark.sql.delta.catalog.DeltaTableV2.deltaLog$lzycompute(DeltaTableV2.scala:78)
	at org.apache.spark.sql.delta.catalog.DeltaTableV2.deltaLog(DeltaTableV2.scala:78)
	at org.apache.spark.sql.delta.catalog.DeltaTableV2.$anonfun$snapshot$3(DeltaTableV2.scala:107)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.delta.catalog.DeltaTableV2.snapshot$lzycompute(DeltaTableV2.scala:107)
	at org.apache.spark.sql.delta.catalog.DeltaTableV2.snapshot(DeltaTableV2.scala:95)
	at org.apache.spark.sql.delta.catalog.DeltaTableV2.properties(DeltaTableV2.scala:124)
	... 53 more

Additional information:

  1. I've modifed the pom.xml under the root of spline-spark-agent and spline-spark-agent/core:
<!--
            <dependency>
                <groupId>org.elasticsearch</groupId>
                <artifactId>elasticsearch-hadoop</artifactId>
                <version>7.6.0</version>
            </dependency>
-->
            <dependency>
                <groupId>org.elasticsearch</groupId>
                <artifactId>elasticsearch-spark-30_2.12</artifactId>
                <version>7.13.4</version>
                <scope>provided</scope>
            </dependency>
  1. I've disabled spark dynamic allocation in suspision that sparkContext might be shutdown for resource problem. Below is the content of my spark-defaults.conf.
spark.master                                        yarn
spark.eventLog.enabled                              true
spark.eventLog.dir                                  hdfs://masters/spark/eventLog
spark.serializer                                    org.apache.spark.serializer.KryoSerializer
spark.delta.logStore.class                          org.apache.spark.sql.delta.storage.HDFSLogStore
spark.executor.extraClassPath                       /home/hadoop/SW/extra-libs/*
spark.driver.extraClassPath                         /home/hadoop/SW/extra-libs/*
spark.hive.metastore.uris                           thrift://192.168.21.8:9083
spark.sql.warehouse.dir                             hdfs://masters/
spark.scheduler.listenerbus.eventqueue.capacity     100000
spark.sql.extensions                                io.delta.sql.DeltaSparkSessionExtension
spark.sql.catalog.spark_catalog                     org.apache.spark.sql.delta.catalog.DeltaCatalog
# yarn
spark.yarn.jars                                     hdfs://masters/spark-yarn-jars/*

## dynamic allocation
#spark.shuffle.service.enabled                       true
#spark.dynamicAllocation.enabled                     true
#spark.dynamicAllocation.minExecutors                1
##spark.dynamicAllocation.maxExecutors
#spark.dynamicAllocation.schedulerBacklogTimeout     5

# Lineage
spark.sql.queryExecutionListeners                   za.co.absa.spline.harvester.listener.SplineQueryExecutionListener
spark.spline.mode                                   REQUIRED
spark.spline.producer.url                           http://h8:9095/spline-rest/producer
#spark.spline.postProcessingFilter.composite.filters dsPasswordReplace
#spark.spline.lineageDispatcher                            console
#spark.spline.lineageDispatcher.console.className          za.co.absa.spline.harvester.dispatcher.ConsoleLineageDispatcher
#spark.spline.lineageDispatcher.console.stream             ERR
#spark.spline.lineageDispatcher.http.producer.url    http://h8:9095/spline-rest/producer
## Kafka dispatcher
#spline.lineageDispatcher=kafka
#spline.lineageDispatcher.kafka.className=za.co.absa.spline.harvester.dispatcher.KafkaLineageDispatcher
## producer configs as defined by kafka (bootstrap.servers, key.serializer, etc) all kafka configs are supported
#spline.lineageDispatcher.kafka.producer.bootstrap.servers=h5:9092,h6:9092,h7:9092
#spline.lineageDispatcher.kafka.producer.key.serializer=org.apache.kafka.common.serialization.StringSerializer
#spline.lineageDispatcher.kafka.producer.value.serializer=org.apache.kafka.common.serialization.StringSerializer
#spline.lineageDispatcher.kafka.producer.max.in.flight.requests.per.connection=1
## topic name for plans and events
#spline.lineageDispatcher.kafka.topic=spline-lineage

For delta2delta application, there seems to be something wrong about ReplaceTableAsSelect. As we can see write extraction failed. Could you kindly help? If I missed anything important, please just let me know.

Taurus-Le avatar Jul 20 '22 09:07 Taurus-Le

There should be one ticket for an issue, I will create new one for the java.util.NoSuchElementException: None.get problem.

cerveada avatar Jul 20 '22 10:07 cerveada

@Taurus-Le I split the issue in two, this one is for delta and #479 is for es. Feel free to add any relevant info or correct me if I split something wrongly.

cerveada avatar Jul 20 '22 11:07 cerveada

Hi @cerveada, sorry for the trouble. I meant to save you some trouble. I did not do the opposite intentionally. Thanks for helping.

Taurus-Le avatar Jul 20 '22 13:07 Taurus-Le

Version of apache maven and JDK used to build spline-spark-agent:

[hadoop@h8 spline-spark-agent]$ mvn -version
Apache Maven 3.8.5 (3599d3414f046de2324203b78ddcf9b5e4388aa0)
Maven home: /home/hadoop/SW/apache-maven-3.8.5
Java version: 1.8.0_331, vendor: Oracle Corporation, runtime: /home/hadoop/SW/jdk1.8.0_331/jre
Default locale: en_US, platform encoding: UTF-8
OS name: "linux", version: "3.10.0-1062.el7.x86_64", arch: "amd64", family: "unix"

And here is how I build the spline-spark-agent:

git clone https://github.com/AbsaOSS/spline-spark-agent.git
git checkout release/0.7.10
mvn scala-cross-build:change-version -Pscala-2.12
mvn clean package -Pscala-2.12,spark-3.2 -Dmaven.test.skip=true

Taurus-Le avatar Jul 21 '22 01:07 Taurus-Le

It seems to be changes in Delta Lake: 1.2.0 that are causing the issue, the code was tested only on 1.1.0 so if you want to try a workaround try to switch to version 1.1.0

cerveada avatar Jul 21 '22 08:07 cerveada

Got it. Thanks. If there's anything I could help, please let me know. And I just found Delta Lake: 2.0.0 has been released.

Taurus-Le avatar Jul 21 '22 09:07 Taurus-Le

Hi @cerveada, I've switched Delta Lake:1.2.0 to Delta Lake: 1.1.0. And this time I got the same error as #479.

22/07/22 09:42:46 ERROR SplineQueryExecutionListener: Unexpected error occurred during lineage processing for application: dim_t_device_mapping_test #application_1658293063172_0023
java.util.NoSuchElementException: None.get
	at scala.None$.get(Option.scala:529)
	at scala.None$.get(Option.scala:527)
	at za.co.absa.spline.harvester.postprocessing.ViewAttributeAddingFilter.toAttributeReferencesMap(ViewAttributeAddingFilter.scala:59)
	at za.co.absa.spline.harvester.postprocessing.ViewAttributeAddingFilter.$anonfun$addMissingAttributeLinks$1(ViewAttributeAddingFilter.scala:39)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at scala.collection.TraversableLike.map(TraversableLike.scala:286)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
	at scala.collection.AbstractTraversable.map(Traversable.scala:108)
	at za.co.absa.spline.harvester.postprocessing.ViewAttributeAddingFilter.addMissingAttributeLinks(ViewAttributeAddingFilter.scala:39)
	at za.co.absa.spline.harvester.postprocessing.ViewAttributeAddingFilter.processExecutionPlan(ViewAttributeAddingFilter.scala:34)
	at za.co.absa.spline.harvester.postprocessing.PostProcessor.$anonfun$process$4(PostProcessor.scala:38)
	at za.co.absa.spline.harvester.postprocessing.PostProcessor.$anonfun$provideCtx$1(PostProcessor.scala:25)
	at scala.Function1.$anonfun$andThen$1(Function1.scala:57)
	at scala.Function1.$anonfun$andThen$1(Function1.scala:57)
	at za.co.absa.spline.harvester.postprocessing.PostProcessor.process(PostProcessor.scala:38)
	at za.co.absa.spline.harvester.LineageHarvester.$anonfun$harvest$4(LineageHarvester.scala:110)
	at scala.Option.flatMap(Option.scala:271)
	at za.co.absa.spline.harvester.LineageHarvester.harvest(LineageHarvester.scala:64)
	at za.co.absa.spline.harvester.QueryExecutionEventHandler.onSuccess(QueryExecutionEventHandler.scala:42)
	at za.co.absa.spline.harvester.listener.SplineQueryExecutionListener.$anonfun$onSuccess$2(SplineQueryExecutionListener.scala:40)
	at za.co.absa.spline.harvester.listener.SplineQueryExecutionListener.$anonfun$onSuccess$2$adapted(SplineQueryExecutionListener.scala:40)
	at scala.Option.foreach(Option.scala:407)
	at za.co.absa.spline.harvester.listener.SplineQueryExecutionListener.$anonfun$onSuccess$1(SplineQueryExecutionListener.scala:40)
	at za.co.absa.spline.harvester.listener.SplineQueryExecutionListener.withErrorHandling(SplineQueryExecutionListener.scala:49)
	at za.co.absa.spline.harvester.listener.SplineQueryExecutionListener.onSuccess(SplineQueryExecutionListener.scala:40)
	at org.apache.spark.sql.util.ExecutionListenerBus.doPostEvent(QueryExecutionListener.scala:158)
	at org.apache.spark.sql.util.ExecutionListenerBus.doPostEvent(QueryExecutionListener.scala:128)
	at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117)
	at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101)
	at org.apache.spark.sql.util.ExecutionListenerBus.postToAll(QueryExecutionListener.scala:128)
	at org.apache.spark.sql.util.ExecutionListenerBus.onOtherEvent(QueryExecutionListener.scala:140)
	at org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:100)
	at org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28)
	at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
	at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
	at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117)
	at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101)
	at org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:105)
	at org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:105)
	at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
	at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:100)
	at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:96)
	at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1404)

So, I think the problem might have something to do with the calling of createOrReplaceTempView in my code which calls createTempViewCommand that calls CreateViewCommand internally. According to readme of spline-spark-agent, CreateViewCommand will be ignored.

Taurus-Le avatar Jul 22 '22 01:07 Taurus-Le

Hmm, I read the logs again and there seems to be an issue with Spark stoping.

java.lang.reflect.InvocationTargetException
Caused by: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.

This looks like Spline is trying to call DeltaTableV2.properties method, but the spark context is already stoped. Can you try to not stop the Spark if it will change the outcome?

cerveada avatar Jul 28 '22 13:07 cerveada

Hi @cerveada, I did not stop the Spark myself. I never called stop() on SparkContext or SparkSession. Is it because I'm running spark on yarn? I had a suspicion there might be a connection between Spark stopping and dynamic allocation before. So I disabled dynamic allocation. But it did not help. And I'm truely sorry I forgot to tell you I got the same error as https://github.com/AbsaOSS/spline-spark-agent/issues/479#issuecomment-1196398326 after using PR: https://github.com/AbsaOSS/spline-spark-agent/pull/481

Taurus-Le avatar Jul 29 '22 02:07 Taurus-Le

I don't know it might be.

On YARN do you run in local mode or cluster mode?

Do you still get the following errors?

java.lang.reflect.InvocationTargetException
Caused by: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.

cerveada avatar Aug 10 '22 11:08 cerveada

You can also try to set higher hadoop.service.shutdown.timeout

cerveada avatar Mar 16 '23 08:03 cerveada