spline-spark-agent
spline-spark-agent copied to clipboard
SplineQueryExecutionListener: Unexpected error occurred during lineage processing for application
Version:
- JDK: 11.0.10
- Scala: 2.12.12
- Hadoop: 3.2.2
- Spark: 3.2.0-bin-hadoop3.2
- spline: release/0.7.7
- spline-ui: release/0.7.4
- spline-spark-agent: release/0.7.10
- Delta Lake: 1.2.0
- Elasticsearch: 7.13.4
Description: The app is reading from multiple delta lake tables and writing the output of join as a new delta lake table. Spline successfully initialized. But unexpected error occurred.
Error logs: delta2delta-1.log delta2delta-2.log delta2delta-3.log
Error information:
- Delta2Delta
22/07/20 15:16:38 ERROR LineageHarvester: Write extraction failed for: class org.apache.spark.sql.catalyst.plans.logical.ReplaceTableAsSelect
22/07/20 15:16:39 ERROR LineageHarvester:
****************************** OBJECT DUMP BEGIN ******************************
class org.apache.spark.sql.catalyst.plans.logical.ReplaceTableAsSelect
resolved: boolean = true
catalog: org.apache.spark.sql.connector.catalog.TableCatalog
BestEffortStagedTable$module: org.apache.spark.sql.delta.catalog.DeltaCatalog$BestEffortStagedTable$ = null
spark: org.apache.spark.sql.SparkSession
implicits$module: org.apache.spark.sql.SparkSession$implicits$
$outer: org.apache.spark.sql.SparkSession ! Object was already logged
org$apache$spark$sql$SparkSession$$creationSite: org.apache.spark.util.CallSite
shortForm: java.lang.String = getOrCreate at SparkEnvironment.scala:30
longForm: java.lang.String = org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:958)
io.github.interestinglab.waterdrop.spark.SparkEnvironment.prepare(SparkEnvironment.scala:30)
io.github.interestinglab.waterdrop.spark.SparkEnvironment.prepare(SparkEnvironment.scala:14)
io.github.interestinglab.waterdrop.config.ConfigBuilder.createEnv(ConfigBuilder.java:193)
io.github.interestinglab.waterdrop.config.ConfigBuilder.<init>(ConfigBuilder.java:39)
io.github.interestinglab.waterdrop.Waterdrop.entryPoint(Waterdrop.java:92)
io.github.interestinglab.waterdrop.Waterdrop.run(Waterdrop.java:58)
io.github.interestinglab.waterdrop.Waterdrop.main(Waterdrop.java:42)
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.base/java.lang.reflect.Method.invoke(Method.java:566)
*****omit for length limit
****************************** OBJECT DUMP END ******************************
22/07/20 15:16:39 ERROR SplineQueryExecutionListener: Unexpected error occurred during lineage processing for application: dim_t_device_mapping_test #application_1658293063172_0008
java.lang.reflect.InvocationTargetException
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at scala.reflect.runtime.JavaMirrors$JavaMirror$JavaVanillaMethodMirror0.jinvokeraw(JavaMirrors.scala:404)
at scala.reflect.runtime.JavaMirrors$JavaMirror$JavaMethodMirror.jinvoke(JavaMirrors.scala:380)
at scala.reflect.runtime.JavaMirrors$JavaMirror$JavaVanillaMethodMirror.apply(JavaMirrors.scala:396)
at za.co.absa.commons.reflect.ReflectionUtils$.$anonfun$extractFieldValue$5(ReflectionUtils.scala:143)
at scala.Option.map(Option.scala:230)
at za.co.absa.commons.reflect.ReflectionUtils$.reflectClass$1(ReflectionUtils.scala:141)
at za.co.absa.commons.reflect.ReflectionUtils$.reflectClassHierarchy$1(ReflectionUtils.scala:112)
at za.co.absa.commons.reflect.ReflectionUtils$.extractFieldValue(ReflectionUtils.scala:169)
at za.co.absa.commons.reflect.ReflectionUtils$.extractFieldValue(ReflectionUtils.scala:184)
at za.co.absa.spline.harvester.plugin.embedded.DataSourceV2Plugin.extractSourceIdFromDeltaTableV2(DataSourceV2Plugin.scala:154)
at za.co.absa.spline.harvester.plugin.embedded.DataSourceV2Plugin.za$co$absa$spline$harvester$plugin$embedded$DataSourceV2Plugin$$extractSourceIdFromTable(DataSourceV2Plugin.scala:150)
at za.co.absa.spline.harvester.plugin.embedded.DataSourceV2Plugin.za$co$absa$spline$harvester$plugin$embedded$DataSourceV2Plugin$$processV2CreateTableCommand(DataSourceV2Plugin.scala:108)
at za.co.absa.spline.harvester.plugin.embedded.DataSourceV2Plugin$$anonfun$2.applyOrElse(DataSourceV2Plugin.scala:77)
at za.co.absa.spline.harvester.plugin.embedded.DataSourceV2Plugin$$anonfun$2.applyOrElse(DataSourceV2Plugin.scala:52)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
at scala.PartialFunction$Lifted.apply(PartialFunction.scala:228)
at scala.PartialFunction$Lifted.apply(PartialFunction.scala:224)
at za.co.absa.spline.harvester.builder.write.PluggableWriteCommandExtractor.asWriteCommand(PluggableWriteCommandExtractor.scala:44)
at za.co.absa.spline.harvester.LineageHarvester.$anonfun$tryExtractWriteCommand$1(LineageHarvester.scala:145)
at scala.util.Try$.apply(Try.scala:213)
at za.co.absa.spline.harvester.LineageHarvester.tryExtractWriteCommand(LineageHarvester.scala:145)
at za.co.absa.spline.harvester.LineageHarvester.harvest(LineageHarvester.scala:64)
at za.co.absa.spline.harvester.QueryExecutionEventHandler.onSuccess(QueryExecutionEventHandler.scala:42)
at za.co.absa.spline.harvester.listener.SplineQueryExecutionListener.$anonfun$onSuccess$2(SplineQueryExecutionListener.scala:40)
at za.co.absa.spline.harvester.listener.SplineQueryExecutionListener.$anonfun$onSuccess$2$adapted(SplineQueryExecutionListener.scala:40)
at scala.Option.foreach(Option.scala:407)
at za.co.absa.spline.harvester.listener.SplineQueryExecutionListener.$anonfun$onSuccess$1(SplineQueryExecutionListener.scala:40)
at za.co.absa.spline.harvester.listener.SplineQueryExecutionListener.withErrorHandling(SplineQueryExecutionListener.scala:49)
at za.co.absa.spline.harvester.listener.SplineQueryExecutionListener.onSuccess(SplineQueryExecutionListener.scala:40)
at org.apache.spark.sql.util.ExecutionListenerBus.doPostEvent(QueryExecutionListener.scala:158)
at org.apache.spark.sql.util.ExecutionListenerBus.doPostEvent(QueryExecutionListener.scala:128)
at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117)
at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101)
at org.apache.spark.sql.util.ExecutionListenerBus.postToAll(QueryExecutionListener.scala:128)
at org.apache.spark.sql.util.ExecutionListenerBus.onOtherEvent(QueryExecutionListener.scala:140)
at org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:100)
at org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28)
at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117)
at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101)
at org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:105)
at org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:105)
at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:100)
at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:96)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1404)
at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:96)
Caused by: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
This stopped SparkContext was created at:
org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:943)
io.github.interestinglab.waterdrop.spark.SparkEnvironment.prepare(SparkEnvironment.scala:30)
io.github.interestinglab.waterdrop.spark.SparkEnvironment.prepare(SparkEnvironment.scala:14)
io.github.interestinglab.waterdrop.config.ConfigBuilder.createEnv(ConfigBuilder.java:193)
io.github.interestinglab.waterdrop.config.ConfigBuilder.<init>(ConfigBuilder.java:39)
io.github.interestinglab.waterdrop.Waterdrop.entryPoint(Waterdrop.java:92)
io.github.interestinglab.waterdrop.Waterdrop.run(Waterdrop.java:58)
io.github.interestinglab.waterdrop.Waterdrop.main(Waterdrop.java:42)
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.base/java.lang.reflect.Method.invoke(Method.java:566)
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:737)
The currently active SparkContext was created at:
org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:943)
io.github.interestinglab.waterdrop.spark.SparkEnvironment.prepare(SparkEnvironment.scala:30)
io.github.interestinglab.waterdrop.spark.SparkEnvironment.prepare(SparkEnvironment.scala:14)
io.github.interestinglab.waterdrop.config.ConfigBuilder.createEnv(ConfigBuilder.java:193)
io.github.interestinglab.waterdrop.config.ConfigBuilder.<init>(ConfigBuilder.java:39)
io.github.interestinglab.waterdrop.Waterdrop.entryPoint(Waterdrop.java:92)
io.github.interestinglab.waterdrop.Waterdrop.run(Waterdrop.java:58)
io.github.interestinglab.waterdrop.Waterdrop.main(Waterdrop.java:42)
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.base/java.lang.reflect.Method.invoke(Method.java:566)
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:737)
at org.apache.spark.SparkContext.assertNotStopped(SparkContext.scala:118)
at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1512)
at org.apache.spark.sql.delta.Snapshot.$anonfun$stateReconstruction$2(Snapshot.scala:107)
at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:120)
at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:118)
at org.apache.spark.sql.delta.Snapshot.recordFrameProfile(Snapshot.scala:58)
at org.apache.spark.sql.delta.Snapshot.$anonfun$stateReconstruction$1(Snapshot.scala:98)
at org.apache.spark.sql.delta.metering.DeltaLogging.withDmqTag(DeltaLogging.scala:124)
at org.apache.spark.sql.delta.metering.DeltaLogging.withDmqTag$(DeltaLogging.scala:123)
at org.apache.spark.sql.delta.Snapshot.withDmqTag(Snapshot.scala:58)
at org.apache.spark.sql.delta.Snapshot.stateReconstruction(Snapshot.scala:98)
at org.apache.spark.sql.delta.Snapshot.$anonfun$cachedState$1(Snapshot.scala:154)
at org.apache.spark.sql.delta.metering.DeltaLogging.withDmqTag(DeltaLogging.scala:124)
at org.apache.spark.sql.delta.metering.DeltaLogging.withDmqTag$(DeltaLogging.scala:123)
at org.apache.spark.sql.delta.Snapshot.withDmqTag(Snapshot.scala:58)
at org.apache.spark.sql.delta.Snapshot.cachedState$lzycompute(Snapshot.scala:154)
at org.apache.spark.sql.delta.Snapshot.cachedState(Snapshot.scala:153)
at org.apache.spark.sql.delta.Snapshot.$anonfun$stateDF$1(Snapshot.scala:164)
at org.apache.spark.sql.delta.metering.DeltaLogging.withDmqTag(DeltaLogging.scala:124)
at org.apache.spark.sql.delta.metering.DeltaLogging.withDmqTag$(DeltaLogging.scala:123)
at org.apache.spark.sql.delta.Snapshot.withDmqTag(Snapshot.scala:58)
at org.apache.spark.sql.delta.Snapshot.stateDF(Snapshot.scala:164)
at org.apache.spark.sql.delta.Snapshot.$anonfun$computedState$3(Snapshot.scala:209)
at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:120)
at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:118)
at org.apache.spark.sql.delta.Snapshot.recordFrameProfile(Snapshot.scala:58)
at org.apache.spark.sql.delta.Snapshot.$anonfun$computedState$2(Snapshot.scala:205)
at org.apache.spark.sql.delta.metering.DeltaLogging.withDmqTag(DeltaLogging.scala:124)
at org.apache.spark.sql.delta.metering.DeltaLogging.withDmqTag$(DeltaLogging.scala:123)
at org.apache.spark.sql.delta.Snapshot.withDmqTag(Snapshot.scala:58)
at org.apache.spark.sql.delta.Snapshot.$anonfun$computedState$1(Snapshot.scala:205)
at org.apache.spark.sql.delta.util.DeltaProgressReporter.withJobDescription(DeltaProgressReporter.scala:53)
at org.apache.spark.sql.delta.util.DeltaProgressReporter.withStatusCode(DeltaProgressReporter.scala:32)
at org.apache.spark.sql.delta.util.DeltaProgressReporter.withStatusCode$(DeltaProgressReporter.scala:27)
at org.apache.spark.sql.delta.Snapshot.withStatusCode(Snapshot.scala:58)
at org.apache.spark.sql.delta.Snapshot.computedState$lzycompute(Snapshot.scala:204)
at org.apache.spark.sql.delta.Snapshot.computedState(Snapshot.scala:202)
at org.apache.spark.sql.delta.Snapshot.metadata(Snapshot.scala:242)
at org.apache.spark.sql.delta.stats.DataSkippingReaderBase.$init$(DataSkippingReader.scala:174)
at org.apache.spark.sql.delta.Snapshot.<init>(Snapshot.scala:67)
at org.apache.spark.sql.delta.SnapshotManagement.$anonfun$createSnapshot$1(SnapshotManagement.scala:271)
at org.apache.spark.sql.delta.SnapshotManagement.createSnapshotFromGivenOrEquivalentLogSegment(SnapshotManagement.scala:390)
at org.apache.spark.sql.delta.SnapshotManagement.createSnapshotFromGivenOrEquivalentLogSegment$(SnapshotManagement.scala:378)
at org.apache.spark.sql.delta.DeltaLog.createSnapshotFromGivenOrEquivalentLogSegment(DeltaLog.scala:64)
at org.apache.spark.sql.delta.SnapshotManagement.createSnapshot(SnapshotManagement.scala:270)
at org.apache.spark.sql.delta.SnapshotManagement.createSnapshot$(SnapshotManagement.scala:266)
at org.apache.spark.sql.delta.DeltaLog.createSnapshot(DeltaLog.scala:64)
at org.apache.spark.sql.delta.SnapshotManagement.$anonfun$getSnapshotAtInit$2(SnapshotManagement.scala:252)
at scala.Option.map(Option.scala:230)
at org.apache.spark.sql.delta.SnapshotManagement.$anonfun$getSnapshotAtInit$1(SnapshotManagement.scala:248)
at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:120)
at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:118)
at org.apache.spark.sql.delta.DeltaLog.recordFrameProfile(DeltaLog.scala:64)
at org.apache.spark.sql.delta.SnapshotManagement.getSnapshotAtInit(SnapshotManagement.scala:246)
at org.apache.spark.sql.delta.SnapshotManagement.getSnapshotAtInit$(SnapshotManagement.scala:245)
at org.apache.spark.sql.delta.DeltaLog.getSnapshotAtInit(DeltaLog.scala:64)
at org.apache.spark.sql.delta.SnapshotManagement.$init$(SnapshotManagement.scala:53)
at org.apache.spark.sql.delta.DeltaLog.<init>(DeltaLog.scala:69)
at org.apache.spark.sql.delta.DeltaLog$.$anonfun$apply$3(DeltaLog.scala:567)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
at org.apache.spark.sql.delta.DeltaLog$.$anonfun$apply$2(DeltaLog.scala:567)
at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:120)
at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:118)
at org.apache.spark.sql.delta.DeltaLog$.recordFrameProfile(DeltaLog.scala:437)
at org.apache.spark.sql.delta.metering.DeltaLogging.$anonfun$recordDeltaOperation$5(DeltaLogging.scala:114)
at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:77)
at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:67)
at org.apache.spark.sql.delta.DeltaLog$.recordOperation(DeltaLog.scala:437)
at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:113)
at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:98)
at org.apache.spark.sql.delta.DeltaLog$.recordDeltaOperation(DeltaLog.scala:437)
at org.apache.spark.sql.delta.DeltaLog$.createDeltaLog$1(DeltaLog.scala:566)
at org.apache.spark.sql.delta.DeltaLog$.$anonfun$apply$4(DeltaLog.scala:577)
at com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4792)
at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2257)
at com.google.common.cache.LocalCache.get(LocalCache.java:4000)
at com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4789)
at org.apache.spark.sql.delta.DeltaLog$.getDeltaLogFromCache$1(DeltaLog.scala:577)
at org.apache.spark.sql.delta.DeltaLog$.apply(DeltaLog.scala:589)
at org.apache.spark.sql.delta.DeltaLog$.forTable(DeltaLog.scala:487)
at org.apache.spark.sql.delta.catalog.DeltaTableV2.deltaLog$lzycompute(DeltaTableV2.scala:78)
at org.apache.spark.sql.delta.catalog.DeltaTableV2.deltaLog(DeltaTableV2.scala:78)
at org.apache.spark.sql.delta.catalog.DeltaTableV2.$anonfun$snapshot$3(DeltaTableV2.scala:107)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.delta.catalog.DeltaTableV2.snapshot$lzycompute(DeltaTableV2.scala:107)
at org.apache.spark.sql.delta.catalog.DeltaTableV2.snapshot(DeltaTableV2.scala:95)
at org.apache.spark.sql.delta.catalog.DeltaTableV2.properties(DeltaTableV2.scala:124)
... 53 more
Additional information:
- I've modifed the pom.xml under the root of spline-spark-agent and spline-spark-agent/core:
<!--
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-hadoop</artifactId>
<version>7.6.0</version>
</dependency>
-->
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-spark-30_2.12</artifactId>
<version>7.13.4</version>
<scope>provided</scope>
</dependency>
- I've disabled spark dynamic allocation in suspision that sparkContext might be shutdown for resource problem. Below is the content of my spark-defaults.conf.
spark.master yarn
spark.eventLog.enabled true
spark.eventLog.dir hdfs://masters/spark/eventLog
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.delta.logStore.class org.apache.spark.sql.delta.storage.HDFSLogStore
spark.executor.extraClassPath /home/hadoop/SW/extra-libs/*
spark.driver.extraClassPath /home/hadoop/SW/extra-libs/*
spark.hive.metastore.uris thrift://192.168.21.8:9083
spark.sql.warehouse.dir hdfs://masters/
spark.scheduler.listenerbus.eventqueue.capacity 100000
spark.sql.extensions io.delta.sql.DeltaSparkSessionExtension
spark.sql.catalog.spark_catalog org.apache.spark.sql.delta.catalog.DeltaCatalog
# yarn
spark.yarn.jars hdfs://masters/spark-yarn-jars/*
## dynamic allocation
#spark.shuffle.service.enabled true
#spark.dynamicAllocation.enabled true
#spark.dynamicAllocation.minExecutors 1
##spark.dynamicAllocation.maxExecutors
#spark.dynamicAllocation.schedulerBacklogTimeout 5
# Lineage
spark.sql.queryExecutionListeners za.co.absa.spline.harvester.listener.SplineQueryExecutionListener
spark.spline.mode REQUIRED
spark.spline.producer.url http://h8:9095/spline-rest/producer
#spark.spline.postProcessingFilter.composite.filters dsPasswordReplace
#spark.spline.lineageDispatcher console
#spark.spline.lineageDispatcher.console.className za.co.absa.spline.harvester.dispatcher.ConsoleLineageDispatcher
#spark.spline.lineageDispatcher.console.stream ERR
#spark.spline.lineageDispatcher.http.producer.url http://h8:9095/spline-rest/producer
## Kafka dispatcher
#spline.lineageDispatcher=kafka
#spline.lineageDispatcher.kafka.className=za.co.absa.spline.harvester.dispatcher.KafkaLineageDispatcher
## producer configs as defined by kafka (bootstrap.servers, key.serializer, etc) all kafka configs are supported
#spline.lineageDispatcher.kafka.producer.bootstrap.servers=h5:9092,h6:9092,h7:9092
#spline.lineageDispatcher.kafka.producer.key.serializer=org.apache.kafka.common.serialization.StringSerializer
#spline.lineageDispatcher.kafka.producer.value.serializer=org.apache.kafka.common.serialization.StringSerializer
#spline.lineageDispatcher.kafka.producer.max.in.flight.requests.per.connection=1
## topic name for plans and events
#spline.lineageDispatcher.kafka.topic=spline-lineage
For delta2delta application, there seems to be something wrong about ReplaceTableAsSelect
. As we can see write extraction failed. Could you kindly help? If I missed anything important, please just let me know.
There should be one ticket for an issue, I will create new one for the java.util.NoSuchElementException: None.get
problem.
@Taurus-Le I split the issue in two, this one is for delta and #479 is for es. Feel free to add any relevant info or correct me if I split something wrongly.
Hi @cerveada, sorry for the trouble. I meant to save you some trouble. I did not do the opposite intentionally. Thanks for helping.
Version of apache maven and JDK used to build spline-spark-agent:
[hadoop@h8 spline-spark-agent]$ mvn -version
Apache Maven 3.8.5 (3599d3414f046de2324203b78ddcf9b5e4388aa0)
Maven home: /home/hadoop/SW/apache-maven-3.8.5
Java version: 1.8.0_331, vendor: Oracle Corporation, runtime: /home/hadoop/SW/jdk1.8.0_331/jre
Default locale: en_US, platform encoding: UTF-8
OS name: "linux", version: "3.10.0-1062.el7.x86_64", arch: "amd64", family: "unix"
And here is how I build the spline-spark-agent:
git clone https://github.com/AbsaOSS/spline-spark-agent.git
git checkout release/0.7.10
mvn scala-cross-build:change-version -Pscala-2.12
mvn clean package -Pscala-2.12,spark-3.2 -Dmaven.test.skip=true
It seems to be changes in Delta Lake: 1.2.0 that are causing the issue, the code was tested only on 1.1.0 so if you want to try a workaround try to switch to version 1.1.0
Got it. Thanks. If there's anything I could help, please let me know. And I just found Delta Lake: 2.0.0 has been released.
Hi @cerveada, I've switched Delta Lake:1.2.0 to Delta Lake: 1.1.0. And this time I got the same error as #479.
22/07/22 09:42:46 ERROR SplineQueryExecutionListener: Unexpected error occurred during lineage processing for application: dim_t_device_mapping_test #application_1658293063172_0023
java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:529)
at scala.None$.get(Option.scala:527)
at za.co.absa.spline.harvester.postprocessing.ViewAttributeAddingFilter.toAttributeReferencesMap(ViewAttributeAddingFilter.scala:59)
at za.co.absa.spline.harvester.postprocessing.ViewAttributeAddingFilter.$anonfun$addMissingAttributeLinks$1(ViewAttributeAddingFilter.scala:39)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at za.co.absa.spline.harvester.postprocessing.ViewAttributeAddingFilter.addMissingAttributeLinks(ViewAttributeAddingFilter.scala:39)
at za.co.absa.spline.harvester.postprocessing.ViewAttributeAddingFilter.processExecutionPlan(ViewAttributeAddingFilter.scala:34)
at za.co.absa.spline.harvester.postprocessing.PostProcessor.$anonfun$process$4(PostProcessor.scala:38)
at za.co.absa.spline.harvester.postprocessing.PostProcessor.$anonfun$provideCtx$1(PostProcessor.scala:25)
at scala.Function1.$anonfun$andThen$1(Function1.scala:57)
at scala.Function1.$anonfun$andThen$1(Function1.scala:57)
at za.co.absa.spline.harvester.postprocessing.PostProcessor.process(PostProcessor.scala:38)
at za.co.absa.spline.harvester.LineageHarvester.$anonfun$harvest$4(LineageHarvester.scala:110)
at scala.Option.flatMap(Option.scala:271)
at za.co.absa.spline.harvester.LineageHarvester.harvest(LineageHarvester.scala:64)
at za.co.absa.spline.harvester.QueryExecutionEventHandler.onSuccess(QueryExecutionEventHandler.scala:42)
at za.co.absa.spline.harvester.listener.SplineQueryExecutionListener.$anonfun$onSuccess$2(SplineQueryExecutionListener.scala:40)
at za.co.absa.spline.harvester.listener.SplineQueryExecutionListener.$anonfun$onSuccess$2$adapted(SplineQueryExecutionListener.scala:40)
at scala.Option.foreach(Option.scala:407)
at za.co.absa.spline.harvester.listener.SplineQueryExecutionListener.$anonfun$onSuccess$1(SplineQueryExecutionListener.scala:40)
at za.co.absa.spline.harvester.listener.SplineQueryExecutionListener.withErrorHandling(SplineQueryExecutionListener.scala:49)
at za.co.absa.spline.harvester.listener.SplineQueryExecutionListener.onSuccess(SplineQueryExecutionListener.scala:40)
at org.apache.spark.sql.util.ExecutionListenerBus.doPostEvent(QueryExecutionListener.scala:158)
at org.apache.spark.sql.util.ExecutionListenerBus.doPostEvent(QueryExecutionListener.scala:128)
at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117)
at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101)
at org.apache.spark.sql.util.ExecutionListenerBus.postToAll(QueryExecutionListener.scala:128)
at org.apache.spark.sql.util.ExecutionListenerBus.onOtherEvent(QueryExecutionListener.scala:140)
at org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:100)
at org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28)
at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117)
at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101)
at org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:105)
at org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:105)
at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:100)
at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:96)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1404)
So, I think the problem might have something to do with the calling of createOrReplaceTempView
in my code which calls createTempViewCommand
that calls CreateViewCommand
internally. According to readme of spline-spark-agent, CreateViewCommand will be ignored.
Hmm, I read the logs again and there seems to be an issue with Spark stoping.
java.lang.reflect.InvocationTargetException
Caused by: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
This looks like Spline is trying to call DeltaTableV2.properties
method, but the spark context is already stoped.
Can you try to not stop the Spark if it will change the outcome?
Hi @cerveada, I did not stop the Spark myself. I never called stop()
on SparkContext or SparkSession. Is it because I'm running spark on yarn? I had a suspicion there might be a connection between Spark stopping and dynamic allocation before. So I disabled dynamic allocation. But it did not help.
And I'm truely sorry I forgot to tell you I got the same error as https://github.com/AbsaOSS/spline-spark-agent/issues/479#issuecomment-1196398326 after using PR: https://github.com/AbsaOSS/spline-spark-agent/pull/481
I don't know it might be.
On YARN do you run in local mode or cluster mode?
Do you still get the following errors?
java.lang.reflect.InvocationTargetException
Caused by: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
You can also try to set higher hadoop.service.shutdown.timeout