azure-event-hubs-spark icon indicating copy to clipboard operation
azure-event-hubs-spark copied to clipboard

EventHub AAD authentication with Service Principal and Secret . Class Loader could not find AuthBySecretCallBackWithParams class

Open seblea opened this issue 3 years ago • 3 comments

Thanks for filing an issue with us! Below are some guidelines when filing an issue. In general, the more detail the better!

Feature Requests:

  • What issue are you trying to solve? With Service Principal and Secret to authenticate EventHub using Scala
  • How do you want to solve it?
  • What is your use case for this feature? Wherever package I have for AuthBySecretCallBackWithParams class, Based on Event Hub AAD documentation, Scala code has to Authenticate EventHub and should get token and have EventHubConf . I am using Maven Scala .I confirmed that AuthBySecretCallBackWithParams class is in my Jar . I put AuthBySecretCallBackWithParams in the same package with EventHubsConf

//import org.apache.spark.eventhubs.{AuthBySecretCallBackWithParams, ConnectionStringBuilder, EventHubsConf, EventPosition}

//import java.net.URI

//eventHubConfigurations holds all parameters

val params: Map[String,Object ] = Map("authority" ->  eventHubConfigurations.tenantId,
  "clientId" -> eventHubConfigurations.clientId,
  "clientSecret" -> eventHubConfigurations.clientSecret)

val connectionString = ConnectionStringBuilder()
  .setAadAuthConnectionString(new URI(s"sb://${eventHubConfigurations.nameSpace}.servicebus.windows.net/"), 
    eventHubConfigurations.eventHubName)
  .build

val eventHubsConf = EventHubsConf(connectionString)
  .setConsumerGroup(eventHubConfigurations.consumerGroup)
  .setAadAuthCallback(callclass)
  .setAadAuthCallbackParams(params)

sparkSession.readStream.format("org.apache.spark.sql.eventhubs.EventHubsSourceProvider").options(eventHubsConf.toMap).load().writeStream.format("delta") .foreachBatch { (batchDF: DataFrame, batchId: Long) => batchDF.withColumn("IngestionTimeStamp", lit(TimeStampFunction())) .write.format("delta").mode("append") .option("checkpointLocation", eventHubcheckpointPath).save(eventHubSinkPath)

  }
  .start().awaitTermination()

Bug Report: Class Loader could not find AuthBySecretCallBackWithParams class

  • Actual behavior: Wherever Package AuthBySecretCallBackWithParams class is being getting ClassNotFoundException java.lang.ClassNotFoundException: org.apache.spark.eventhubs.AuthBySecretCallBackWithParams at java.net.URLClassLoader.findClass(URLClassLoader.java:387) at java.lang.ClassLoader.loadClass(ClassLoader.java:418) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352) at java.lang.ClassLoader.loadClass(ClassLoader.java:351) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:264) at org.apache.spark.eventhubs.EventHubsConf$$anonfun$aadAuthCallback$2.apply(EventHubsConf.scala:640) at org.apache.spark.eventhubs.EventHubsConf$$anonfun$aadAuthCallback$2.apply(EventHubsConf.scala:638) at scala.Option.map(Option.scala:146) at org.apache.spark.eventhubs.EventHubsConf.aadAuthCallback(EventHubsConf.scala:638) at org.apache.spark.eventhubs.client.ClientConnectionPool$$anonfun$org$apache$spark$eventhubs$client$ClientConnectionPool$$borrowClient$3.apply(ClientConnectionPool.scala:73) at org.apache.spark.eventhubs.client.ClientConnectionPool$$anonfun$org$apache$spark$eventhubs$client$ClientConnectionPool$$borrowClient$3.apply(ClientConnectionPool.scala:71) at org.apache.spark.eventhubs.utils.RetryUtils$$anonfun$retryJava$1.apply(RetryUtils.scala:91) at org.apache.spark.eventhubs.utils.RetryUtils$$anonfun$retryJava$1.apply(RetryUtils.scala:91) at org.apache.spark.eventhubs.utils.RetryUtils$.org$apache$spark$eventhubs$utils$RetryUtils$$retryHelper$1(RetryUtils.scala:116) at org.apache.spark.eventhubs.utils.RetryUtils$.retryScala(RetryUtils.scala:149) at org.apache.spark.eventhubs.utils.RetryUtils$.retryJava(RetryUtils.scala:91) at org.apache.spark.eventhubs.client.ClientConnectionPool.org$apache$spark$eventhubs$client$ClientConnectionPool$$borrowClient(ClientConnectionPool.scala:69) at org.apache.spark.eventhubs.client.ClientConnectionPool$.borrowClient(ClientConnectionPool.scala:170) at org.apache.spark.eventhubs.client.EventHubsClient.org$apache$spark$eventhubs$client$EventHubsClient$$client(EventHubsClient.scala:62) at org.apache.spark.eventhubs.client.EventHubsClient.liftedTree1$1(EventHubsClient.scala:187) at org.apache.spark.eventhubs.client.EventHubsClient.partitionCountLazyVal$lzycompute(EventHubsClient.scala:184) at org.apache.spark.eventhubs.client.EventHubsClient.partitionCountLazyVal(EventHubsClient.scala:183) at org.apache.spark.eventhubs.client.EventHubsClient.partitionCount(EventHubsClient.scala:176) at org.apache.spark.sql.eventhubs.EventHubsSource.org$apache$spark$sql$eventhubs$EventHubsSource$$partitionCount(EventHubsSource.scala:81) at org.apache.spark.sql.eventhubs.EventHubsSource$$anonfun$1$$anonfun$apply$mcJ$sp$1.apply$mcJ$sp(EventHubsSource.scala:96) at org.apache.spark.sql.eventhubs.EventHubsSource$$anonfun$1$$anonfun$apply$mcJ$sp$1.apply(EventHubsSource.scala:96) at org.apache.spark.sql.eventhubs.EventHubsSource$$anonfun$1$$anonfun$apply$mcJ$sp$1.apply(EventHubsSource.scala:96) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.sql.eventhubs.EventHubsSource$$anonfun$1.apply$mcJ$sp(EventHubsSource.scala:96) at org.apache.spark.sql.eventhubs.EventHubsSource$$anonfun$1.apply(EventHubsSource.scala:96) at org.apache.spark.sql.eventhubs.EventHubsSource$$anonfun$1.apply(EventHubsSource.scala:96) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.sql.eventhubs.EventHubsSource.(EventHubsSource.scala:95) at org.apache.spark.sql.eventhubs.EventHubsSourceProvider.createSource(EventHubsSourceProvider.scala:84) at org.apache.spark.sql.execution.datasources.DataSource.createSource(DataSource.scala:268) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1$$anonfun$applyOrElse$1.apply(MicroBatchExecution.scala:88) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1$$anonfun$applyOrElse$1.apply(MicroBatchExecution.scala:85) at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:79) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:85) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:83) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:268) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:268) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:267) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$class.transformDown(AnalysisHelper.scala:149) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:273) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:273) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$6.apply(TreeNode.scala:343) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:196) at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:341) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:273) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$class.transformDown(AnalysisHelper.scala:149) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:273) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:273) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$6.apply(TreeNode.scala:343) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:196) at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:341) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:273) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$class.transformDown(AnalysisHelper.scala:149) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29) at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:257) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:83) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:65) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:269) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193) org.apache.spark.sql.streaming.StreamingQueryException: org.apache.spark.eventhubs.AuthBySecretCallBackWithParams === Streaming Query ===
  • Expected behavior : Running Spark job on Synapse Spark , Expected to stream from EventHub
  • Spark version : Apache Spark 2.4.0, Scala 2.11
  • spark-eventhubs artifactId and version : com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.21

seblea avatar Feb 22 '22 03:02 seblea

@seblea, Did you find a solution to this?

jeffco11 avatar Aug 15 '22 16:08 jeffco11

@seblea and @jeffco11 were you able to find a solution to this? also facing the same issue.

LHuang2019 avatar Jan 08 '23 21:01 LHuang2019

@seblea here are what I did to tackle the "class not found issue" in Synapse Spark. Hopefully, this is helpful to you or others to some extent.

Solution 1

Download all dependencies jars after including only azure-{eventhubs, msal4j}:

  • Run mvn dependency:copy-dependencies -DoutputDirectory=<PATH>
    • 40+ jars at the time of testing.
  • Add all these jars to workspace library and upload to Spark-pool.
    • Caveat: (Jar conflicts) Some downloaded jars are the same as the existing ones in Spark-Pool.
    • Exception:
    Caused by: com.fasterxml.jackson.databind.JsonMappingException: Scala module 2.13.4 requires Jackson Databind version >= 2.13.0 and < 2.14.0 - Found jackson-databind version 2.12.1 at com.fasterxml.jackson.module.scala.JacksonModule.setupModule(JacksonModule.scala:61)
    
  • Workaround: remove the conflicted library one at the time.
    • Issue: require manual work and not ideal for package management.

Solution 2

Include azure-{eventhubs, msal4j}:

  • How to identify libraries' prefixes for renaming:
    • Find patterns in the console output of downloaded dependencies paths in IDE or mvn.
  • Shade specific patterns using relocation under Plugin (maven-shade-plugin):
    • shaded.com.fasterxml.jackson.databind
    • shaded.com.microsoft.azure
    • shaded.net.minidev
  • Exclude META-INF directory files since it can cause compilation issues (inform customer, used with discretion):
    • META-INF/*.SF
    • META-INF/*.DSA
    • META-INF/*.RSA
    • META-INF/*.MF

Issue:

  • This solution removes some of the metadata files from the final JAR file, which can cause issues with some tools or libraries.
    • It is recommended to keep the metadata files for better compatibility and troubleshooting.

KaiLiPlayground avatar Mar 12 '24 03:03 KaiLiPlayground