pulsar-spark
pulsar-spark copied to clipboard
[BUG]write data into pulsar, Literal must have a corresponding value to string, but class String found
Describe the bug when write data in pulsar,,use the follow code:
def main(args: Array[String]): Unit = {
val sparkSession = SparkSession.builder().appName("test-pulsar").master("local").getOrCreate()
val startingOffsets = topicOffsets(Map("persistent://public/default/my-topic" -> MessageId.fromByteArray(Array(8,33,16,8))))
import sparkSession.implicits._
val ds = sparkSession.createDataset(1 to 10)
.write
.format("pulsar")
.option("service.url", "pulsar://localhost:6650")
.option("admin.url", "http://localhost:8080")
.option("topic", "persistent://public/default/my-topic-2")
.save()
sparkSession.stop()
}
then, get a error
Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: Literal must have a corresponding value to string, but class String found.
at scala.Predef$.require(Predef.scala:277)
at org.apache.spark.sql.catalyst.expressions.Literal$.validateLiteralValue(literals.scala:219)
at org.apache.spark.sql.catalyst.expressions.Literal.<init>(literals.scala:296)
at org.apache.spark.sql.pulsar.PulsarSinks$.$anonfun$validateQuery$2(PulsarSinks.scala:89)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.pulsar.PulsarSinks$.validateQuery(PulsarSinks.scala:83)
at org.apache.spark.sql.pulsar.PulsarProvider.createRelation(PulsarProvider.scala:185)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:132)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:131)
at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301)
at pulsar.exProduce$.main(exProduce.scala:26)
at pulsar.exProduce.main(exProduce.scala)
21/08/13 15:41:12 INFO SparkContext: Invoking stop() from shutdown hook
21/08/13 15:41:12 INFO SparkUI: Stopped Spark web UI at http://172.18.21.74:4040
21/08/13 15:41:12 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
To Reproduce data must write into pulsar, and in topic can read the write data
Expected behavior
When this bug fix will be merged
Folks, I am a bit surprised this hasnt been merged into an updated version yet. This fix is pretty critical in order for this connector to even work with Spark 3.0
nathluu can we get an update here?
The problem is fixed in this PR: https://github.com/streamnative/pulsar-spark/pull/62
And is released in: Release-3.1.1.4
I get the same error with 3.1.1.4
We have the same error with 3.1.1.4. It looks like the maven artifact does not have code fix for this issue.
@nlu90 Do we have an update here, the latest maven artifact for 3.1.1.4 does not have the change
@DutchDave @acherla our team member is looking into this issue. Will provide update soon
I'm facing the exact same error. I cannot push any data to pulsar, I get the error no matter what. @nlu90 Is there any workaround for this, until the fix is published?
I have tried on my local machine. But I can't reproduce this issue. Can anyone provide a reproduce code?
@kvedes @acherla @DutchDave @naishu
Hi @syhily , I dont have a simple example for reproducing, but hope the below can point you in the right direction. I can try to make a simpler example later. I'm running Pulsar version 2.10.1.11 on Open Shift, and connecting from Databricks with runtime 11.3 LTS which uses Spark 3.3.0. On my Spark cluster I have installed io.streamnative.connectors:pulsar-spark-connector_2.12:3.1.1.4 from Maven.
My code:
token = dbutils.secrets.get(scope="datalake", key="pulsar-super-user-token")
topic = dbutils.secrets.get(scope="datalake", key="topic")
df_dummy = spark.createDataFrame([(1, 5), (2, 6), (3, 7), (4, 7), (5, 8)], schema="__key int, value int")
df_dummy.write.mode("overwrite").format("delta").saveAsTable("bronze.stream_dummy")
df_dummy_stream = spark.readStream.table("bronze.stream_dummy")
df = (df_dummy_stream
.writeStream
.format("pulsar")
.option("service.url", "pulsar+ssl://....")
.option("admin.url", "https://....")
.option("pulsar.client.authPluginClassName","org.apache.pulsar.client.impl.auth.AuthenticationToken")
.option("pulsar.client.authParams",f"token:{token}")
.option("checkpointLocation", f"/mnt/bronze/_checkpoint")
.option("topic", topic)
.start())
The latest release 3.1.1.4 was in June, while the fix mentioned above was pushed to master in August. So maybe making a new release should fix the issue? https://github.com/streamnative/pulsar-spark/pull/62
@kvedes Yep. I think you are right. I tested the code on master branch and everything seems OK. This confuse me a lot.
@syhily Sounds good. What is the process of making a new release?
We plan to release it today. Sorry for waiting too long.