azure-event-hubs-spark
azure-event-hubs-spark copied to clipboard
EventHub AAD authentication with Service Principal and Secret . Class Loader could not find AuthBySecretCallBackWithParams class
Thanks for filing an issue with us! Below are some guidelines when filing an issue. In general, the more detail the better!
Feature Requests:
- What issue are you trying to solve? With Service Principal and Secret to authenticate EventHub using Scala
- How do you want to solve it?
- What is your use case for this feature? Wherever package I have for AuthBySecretCallBackWithParams class, Based on Event Hub AAD documentation, Scala code has to Authenticate EventHub and should get token and have EventHubConf . I am using Maven Scala .I confirmed that AuthBySecretCallBackWithParams class is in my Jar . I put AuthBySecretCallBackWithParams in the same package with EventHubsConf
//import org.apache.spark.eventhubs.{AuthBySecretCallBackWithParams, ConnectionStringBuilder, EventHubsConf, EventPosition}
//import java.net.URI
//eventHubConfigurations holds all parameters
val params: Map[String,Object ] = Map("authority" -> eventHubConfigurations.tenantId,
"clientId" -> eventHubConfigurations.clientId,
"clientSecret" -> eventHubConfigurations.clientSecret)
val connectionString = ConnectionStringBuilder()
.setAadAuthConnectionString(new URI(s"sb://${eventHubConfigurations.nameSpace}.servicebus.windows.net/"),
eventHubConfigurations.eventHubName)
.build
val eventHubsConf = EventHubsConf(connectionString)
.setConsumerGroup(eventHubConfigurations.consumerGroup)
.setAadAuthCallback(callclass)
.setAadAuthCallbackParams(params)
sparkSession.readStream.format("org.apache.spark.sql.eventhubs.EventHubsSourceProvider").options(eventHubsConf.toMap).load().writeStream.format("delta") .foreachBatch { (batchDF: DataFrame, batchId: Long) => batchDF.withColumn("IngestionTimeStamp", lit(TimeStampFunction())) .write.format("delta").mode("append") .option("checkpointLocation", eventHubcheckpointPath).save(eventHubSinkPath)
}
.start().awaitTermination()
Bug Report: Class Loader could not find AuthBySecretCallBackWithParams class
- Actual behavior: Wherever Package AuthBySecretCallBackWithParams class is being getting ClassNotFoundException
java.lang.ClassNotFoundException: org.apache.spark.eventhubs.AuthBySecretCallBackWithParams
at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:264)
at org.apache.spark.eventhubs.EventHubsConf$$anonfun$aadAuthCallback$2.apply(EventHubsConf.scala:640)
at org.apache.spark.eventhubs.EventHubsConf$$anonfun$aadAuthCallback$2.apply(EventHubsConf.scala:638)
at scala.Option.map(Option.scala:146)
at org.apache.spark.eventhubs.EventHubsConf.aadAuthCallback(EventHubsConf.scala:638)
at org.apache.spark.eventhubs.client.ClientConnectionPool$$anonfun$org$apache$spark$eventhubs$client$ClientConnectionPool$$borrowClient$3.apply(ClientConnectionPool.scala:73)
at org.apache.spark.eventhubs.client.ClientConnectionPool$$anonfun$org$apache$spark$eventhubs$client$ClientConnectionPool$$borrowClient$3.apply(ClientConnectionPool.scala:71)
at org.apache.spark.eventhubs.utils.RetryUtils$$anonfun$retryJava$1.apply(RetryUtils.scala:91)
at org.apache.spark.eventhubs.utils.RetryUtils$$anonfun$retryJava$1.apply(RetryUtils.scala:91)
at org.apache.spark.eventhubs.utils.RetryUtils$.org$apache$spark$eventhubs$utils$RetryUtils$$retryHelper$1(RetryUtils.scala:116)
at org.apache.spark.eventhubs.utils.RetryUtils$.retryScala(RetryUtils.scala:149)
at org.apache.spark.eventhubs.utils.RetryUtils$.retryJava(RetryUtils.scala:91)
at org.apache.spark.eventhubs.client.ClientConnectionPool.org$apache$spark$eventhubs$client$ClientConnectionPool$$borrowClient(ClientConnectionPool.scala:69)
at org.apache.spark.eventhubs.client.ClientConnectionPool$.borrowClient(ClientConnectionPool.scala:170)
at org.apache.spark.eventhubs.client.EventHubsClient.org$apache$spark$eventhubs$client$EventHubsClient$$client(EventHubsClient.scala:62)
at org.apache.spark.eventhubs.client.EventHubsClient.liftedTree1$1(EventHubsClient.scala:187)
at org.apache.spark.eventhubs.client.EventHubsClient.partitionCountLazyVal$lzycompute(EventHubsClient.scala:184)
at org.apache.spark.eventhubs.client.EventHubsClient.partitionCountLazyVal(EventHubsClient.scala:183)
at org.apache.spark.eventhubs.client.EventHubsClient.partitionCount(EventHubsClient.scala:176)
at org.apache.spark.sql.eventhubs.EventHubsSource.org$apache$spark$sql$eventhubs$EventHubsSource$$partitionCount(EventHubsSource.scala:81)
at org.apache.spark.sql.eventhubs.EventHubsSource$$anonfun$1$$anonfun$apply$mcJ$sp$1.apply$mcJ$sp(EventHubsSource.scala:96)
at org.apache.spark.sql.eventhubs.EventHubsSource$$anonfun$1$$anonfun$apply$mcJ$sp$1.apply(EventHubsSource.scala:96)
at org.apache.spark.sql.eventhubs.EventHubsSource$$anonfun$1$$anonfun$apply$mcJ$sp$1.apply(EventHubsSource.scala:96)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.eventhubs.EventHubsSource$$anonfun$1.apply$mcJ$sp(EventHubsSource.scala:96)
at org.apache.spark.sql.eventhubs.EventHubsSource$$anonfun$1.apply(EventHubsSource.scala:96)
at org.apache.spark.sql.eventhubs.EventHubsSource$$anonfun$1.apply(EventHubsSource.scala:96)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.eventhubs.EventHubsSource.
(EventHubsSource.scala:95) at org.apache.spark.sql.eventhubs.EventHubsSourceProvider.createSource(EventHubsSourceProvider.scala:84) at org.apache.spark.sql.execution.datasources.DataSource.createSource(DataSource.scala:268) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1$$anonfun$applyOrElse$1.apply(MicroBatchExecution.scala:88) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1$$anonfun$applyOrElse$1.apply(MicroBatchExecution.scala:85) at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:79) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:85) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:83) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:268) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:268) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:267) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$class.transformDown(AnalysisHelper.scala:149) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:273) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:273) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$6.apply(TreeNode.scala:343) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:196) at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:341) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:273) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$class.transformDown(AnalysisHelper.scala:149) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:273) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:273) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$6.apply(TreeNode.scala:343) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:196) at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:341) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:273) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$class.transformDown(AnalysisHelper.scala:149) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29) at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:257) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:83) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:65) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:269) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193) org.apache.spark.sql.streaming.StreamingQueryException: org.apache.spark.eventhubs.AuthBySecretCallBackWithParams === Streaming Query === - Expected behavior : Running Spark job on Synapse Spark , Expected to stream from EventHub
- Spark version : Apache Spark 2.4.0, Scala 2.11
- spark-eventhubs artifactId and version : com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.21
@seblea, Did you find a solution to this?
@seblea and @jeffco11 were you able to find a solution to this? also facing the same issue.
@seblea here are what I did to tackle the "class not found issue" in Synapse Spark. Hopefully, this is helpful to you or others to some extent.
Solution 1
Download all dependencies jars after including only azure-{eventhubs, msal4j}
:
- Run
mvn dependency:copy-dependencies -DoutputDirectory=<PATH>
- 40+ jars at the time of testing.
- Add all these jars to workspace library and upload to Spark-pool.
- Caveat: (Jar conflicts) Some downloaded jars are the same as the existing ones in Spark-Pool.
- Exception:
Caused by: com.fasterxml.jackson.databind.JsonMappingException: Scala module 2.13.4 requires Jackson Databind version >= 2.13.0 and < 2.14.0 - Found jackson-databind version 2.12.1 at com.fasterxml.jackson.module.scala.JacksonModule.setupModule(JacksonModule.scala:61)
-
Workaround: remove the conflicted library one at the time.
- Issue: require manual work and not ideal for package management.
Solution 2
Include azure-{eventhubs, msal4j}
:
-
How to identify libraries' prefixes for renaming:
- Find patterns in the console output of downloaded dependencies paths in IDE or
mvn
.
- Find patterns in the console output of downloaded dependencies paths in IDE or
-
Shade specific patterns using relocation under Plugin (maven-shade-plugin):
-
shaded.com.fasterxml.jackson.databind
-
shaded.com.microsoft.azure
-
shaded.net.minidev
-
-
Exclude META-INF directory files since it can cause compilation issues (inform customer, used with discretion):
-
META-INF/*.SF
-
META-INF/*.DSA
-
META-INF/*.RSA
-
META-INF/*.MF
-
Issue:
- This solution removes some of the metadata files from the final JAR file, which can cause issues with some tools or libraries.
- It is recommended to keep the metadata files for better compatibility and troubleshooting.