pulsar-spark
pulsar-spark copied to clipboard
[BUG] Error when resetting subscription
Describe the bug Randomly (or at least I havent found the exact moment when it happens), when the Spark application receives a message from Pulsar I get this exception : "org.apache.pulsar.client.api.PulsarClientException: Error when resetting subscription: 142926:2" and stops working.
To Reproduce I'm using Pulsar 2.5.0, deployed on kubernetes with the default mini helm configuration (no custom configuration) and Spark 2.4.4 running locally (spark-sql_2.11 and pulsar-spark-connector_2.11 packages).
I use this code in the Spark application :
` String serviceUrl = "pulsar://172.16.101.114:30002"; String adminUrl = "http://172.16.101.114:30001";
SparkSession spark = SparkSession.builder().appName("PulsarTest").master("local[2]").getOrCreate();
spark.sparkContext().setLogLevel("WARN");
Dataset<Row> dataset = spark
.readStream()
.format("pulsar")
.option("service.url", serviceUrl)
.option("admin.url", adminUrl)
.option("topic", "aom-emp")
.load();
StreamingQuery query = dataset
.withWatermark("__eventTime", "10 minutes")
.groupBy(window(dataset.col("__eventTime"), "1 minute"), col("objet"), col("action"))
.sum("nb")
.select(
concat(col("objet"), lit("-"), col("action")).as("__topic"),
col("window").cast(DataTypes.StringType).as("__key"),
col("sum(nb)").as("value"))
.writeStream()
.outputMode("update")
.format("pulsar")
.option("service.url", serviceUrl)
.option("admin.url", adminUrl)
.option("checkpointLocation", "/tmp/checkpoint")
.start();
query.awaitTermination();`
172.16.101.114 is the IP of the Pulsar Proxy.
When I send messages in "aom-emp", most of the times it works but sometimes the exception occurs and the Spark application crashes.
Expected behavior The application runs forever with no error.
the error seems to happen at least everytime I launch the Spark Application for the first time when I receive the first message. If after the first failure I delete the checkpoint director, I relaunch the Spark Application and then send a new message to the topic, there is no more error.
I tried today with a Spark Application running inside Kubernetes (deployed with Spark Operator), and I get the exact same issue.
@yjshen can you please help check the errors?
#29 is tracking the same issue, it is waiting for apache/pulsar#6120
@yjshen we are waiting to Pulsar 2.5.1 to see if it changes something, meanwhile can we do something to mitigate this issue ?
This issue should be resolved in latest Pulsar.