e2e-data-engineering icon indicating copy to clipboard operation
e2e-data-engineering copied to clipboard

Connect with Kafka not able to form

Open choonhongyeoh0241 opened this issue 1 year ago • 6 comments

Whenever we try to connect to kafka, we get this error: WARNING:root:kafka dataframe could not be created because: An error occurred while calling o36.load. : java.lang.NoClassDefFoundError: scala/$less$colon$less at org.apache.spark.sql.kafka010.KafkaSourceProvider.org$apache$spark$sql$kafka010$KafkaSourceProvider$$validateStreamOptions(KafkaSourceProvider.scala:338) at org.apache.spark.sql.kafka010.KafkaSourceProvider.sourceSchema(KafkaSourceProvider.scala:71) at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:233) at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:118) at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:118) at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:35) at org.apache.spark.sql.streaming.DataStreamReader.loadInternal(DataStreamReader.scala:168) at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:144) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182) at py4j.ClientServerConnection.run(ClientServerConnection.java:106) at java.lang.Thread.run(Thread.java:750) Caused by: java.lang.ClassNotFoundException: scala.$less$colon$less at java.net.URLClassLoader.findClass(URLClassLoader.java:387) at java.lang.ClassLoader.loadClass(ClassLoader.java:418) at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ... 20 more

Traceback (most recent call last): File "C:\Users\User\Desktop\Data Engineering\Data Engineering\spark_stream.py", line 143, in selection_df = create_selection_df_from_kafka(spark_df) File "C:\Users\User\Desktop\Data Engineering\Data Engineering\spark_stream.py", line 129, in create_selection_df_from_kafka sel = spark_df.selectExpr("CAST(value AS STRING)")
AttributeError: 'NoneType' object has no attribute 'selectExpr'

choonhongyeoh0241 avatar Feb 04 '24 01:02 choonhongyeoh0241

hello, i had fixed that error successfully after 2 days. Here is my step:

  1. Check kafka version - it is base on confluentinc/cp-server image of broker, the version of Kafka used is typically bundled with the Confluent Platform version.
  2. Find the right version of jars package in the config of Spark Session at Maven, add it to: venv/lib/python3.11/site-packages/pyspark/jars
  3. Change config of jar file you just added in:

s_conn = SparkSession.builder
.appName('SparkDataStreaming')
.config('spark.jars.packages', "com.datastax.spark:spark-cassandra-connector_2.12:3.4.0," "org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.0")
.config('spark.cassandra.connection.host', 'localhost')
.getOrCreate() 4. Run spark-submit again and keyspace and table will be create sucessfully.

Hope it effective!

VuKhoiGVM avatar Mar 03 '24 08:03 VuKhoiGVM

Hi there, the author has explained the missing packages in this part: https://youtu.be/GqAcTrqKcrY?si=QzgPjlC-RHMULUS0&t=4599

I believe it's still missing the kafka-clients package. You can add the jar file with two missing packages from @VuKhoiGVM's comment.

Ref: https://stackoverflow.com/a/71059689/17316050

Also please double-check the Spark, Cassandra, and Scala versions to download the compatible version or else it won't work.

teddythinh avatar Mar 06 '24 11:03 teddythinh

I get the error even after adding the jar files

ElNino9495 avatar Apr 18 '24 13:04 ElNino9495

I get the error even after adding the jar files

How did you manage to solve that issue?

omursnck avatar Jun 01 '24 11:06 omursnck