pulsar-spark icon indicating copy to clipboard operation
pulsar-spark copied to clipboard

[BUG]write data into pulsar, Literal must have a corresponding value to string, but class String found

Open chlyzzo opened this issue 3 years ago • 6 comments

Describe the bug when write data in pulsar,,use the follow code:

def main(args: Array[String]): Unit = {

    val sparkSession = SparkSession.builder().appName("test-pulsar").master("local").getOrCreate()

    val startingOffsets = topicOffsets(Map("persistent://public/default/my-topic" -> MessageId.fromByteArray(Array(8,33,16,8))))

    import sparkSession.implicits._
    val ds = sparkSession.createDataset(1 to 10)
       .write
       .format("pulsar")
       .option("service.url", "pulsar://localhost:6650")
       .option("admin.url", "http://localhost:8080")
       .option("topic", "persistent://public/default/my-topic-2")
       .save()

    sparkSession.stop()
}

then, get a error

Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: Literal must have a corresponding value to string, but class String found.
	at scala.Predef$.require(Predef.scala:277)
	at org.apache.spark.sql.catalyst.expressions.Literal$.validateLiteralValue(literals.scala:219)
	at org.apache.spark.sql.catalyst.expressions.Literal.<init>(literals.scala:296)
	at org.apache.spark.sql.pulsar.PulsarSinks$.$anonfun$validateQuery$2(PulsarSinks.scala:89)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.sql.pulsar.PulsarSinks$.validateQuery(PulsarSinks.scala:83)
	at org.apache.spark.sql.pulsar.PulsarProvider.createRelation(PulsarProvider.scala:185)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:132)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:131)
	at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301)
	at pulsar.exProduce$.main(exProduce.scala:26)
	at pulsar.exProduce.main(exProduce.scala)
21/08/13 15:41:12 INFO SparkContext: Invoking stop() from shutdown hook
21/08/13 15:41:12 INFO SparkUI: Stopped Spark web UI at http://172.18.21.74:4040
21/08/13 15:41:12 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!

To Reproduce data must write into pulsar, and in topic can read the write data

Expected behavior

chlyzzo avatar Aug 13 '21 07:08 chlyzzo

When this bug fix will be merged

krishna1234998 avatar Aug 18 '22 16:08 krishna1234998

Folks, I am a bit surprised this hasnt been merged into an updated version yet. This fix is pretty critical in order for this connector to even work with Spark 3.0

nathluu can we get an update here?

acherla avatar Sep 22 '22 19:09 acherla

The problem is fixed in this PR: https://github.com/streamnative/pulsar-spark/pull/62

And is released in: Release-3.1.1.4

nlu90 avatar Sep 28 '22 02:09 nlu90

I get the same error with 3.1.1.4

naishu avatar Oct 06 '22 02:10 naishu

We have the same error with 3.1.1.4. It looks like the maven artifact does not have code fix for this issue.

DutchDave avatar Oct 18 '22 10:10 DutchDave

@nlu90 Do we have an update here, the latest maven artifact for 3.1.1.4 does not have the change

acherla avatar Oct 20 '22 16:10 acherla

@DutchDave @acherla our team member is looking into this issue. Will provide update soon

nlu90 avatar Nov 04 '22 01:11 nlu90

I'm facing the exact same error. I cannot push any data to pulsar, I get the error no matter what. @nlu90 Is there any workaround for this, until the fix is published?

kvedes avatar Nov 07 '22 15:11 kvedes

I have tried on my local machine. But I can't reproduce this issue. Can anyone provide a reproduce code?

@kvedes @acherla @DutchDave @naishu

syhily avatar Nov 07 '22 19:11 syhily

Hi @syhily , I dont have a simple example for reproducing, but hope the below can point you in the right direction. I can try to make a simpler example later. I'm running Pulsar version 2.10.1.11 on Open Shift, and connecting from Databricks with runtime 11.3 LTS which uses Spark 3.3.0. On my Spark cluster I have installed io.streamnative.connectors:pulsar-spark-connector_2.12:3.1.1.4 from Maven.

My code:

token = dbutils.secrets.get(scope="datalake", key="pulsar-super-user-token")
topic = dbutils.secrets.get(scope="datalake", key="topic")

df_dummy = spark.createDataFrame([(1, 5), (2, 6), (3, 7), (4, 7), (5, 8)], schema="__key int, value int")
df_dummy.write.mode("overwrite").format("delta").saveAsTable("bronze.stream_dummy")
df_dummy_stream = spark.readStream.table("bronze.stream_dummy")

df = (df_dummy_stream
  .writeStream
  .format("pulsar")
  .option("service.url", "pulsar+ssl://....")
  .option("admin.url", "https://....")
  .option("pulsar.client.authPluginClassName","org.apache.pulsar.client.impl.auth.AuthenticationToken")
  .option("pulsar.client.authParams",f"token:{token}")
  .option("checkpointLocation", f"/mnt/bronze/_checkpoint")
  .option("topic", topic)
  .start())

kvedes avatar Nov 08 '22 07:11 kvedes

The latest release 3.1.1.4 was in June, while the fix mentioned above was pushed to master in August. So maybe making a new release should fix the issue? https://github.com/streamnative/pulsar-spark/pull/62

kvedes avatar Nov 08 '22 08:11 kvedes

@kvedes Yep. I think you are right. I tested the code on master branch and everything seems OK. This confuse me a lot.

syhily avatar Nov 08 '22 09:11 syhily

@syhily Sounds good. What is the process of making a new release?

kvedes avatar Nov 08 '22 11:11 kvedes

We plan to release it today. Sorry for waiting too long.

syhily avatar Nov 18 '22 03:11 syhily