pulsar-spark icon indicating copy to clipboard operation
pulsar-spark copied to clipboard

[BUG] Error when resetting subscription

Open mathieudruart opened this issue 5 years ago • 4 comments

Describe the bug Randomly (or at least I havent found the exact moment when it happens), when the Spark application receives a message from Pulsar I get this exception : "org.apache.pulsar.client.api.PulsarClientException: Error when resetting subscription: 142926:2" and stops working.

To Reproduce I'm using Pulsar 2.5.0, deployed on kubernetes with the default mini helm configuration (no custom configuration) and Spark 2.4.4 running locally (spark-sql_2.11 and pulsar-spark-connector_2.11 packages).

I use this code in the Spark application :

` String serviceUrl = "pulsar://172.16.101.114:30002"; String adminUrl = "http://172.16.101.114:30001";

    SparkSession spark = SparkSession.builder().appName("PulsarTest").master("local[2]").getOrCreate();
	
    spark.sparkContext().setLogLevel("WARN");
	
    Dataset<Row> dataset = spark
            .readStream()
            .format("pulsar")
            .option("service.url", serviceUrl)
            .option("admin.url", adminUrl)
            .option("topic", "aom-emp")
            .load();
			
    StreamingQuery query = dataset
            .withWatermark("__eventTime", "10 minutes")
            .groupBy(window(dataset.col("__eventTime"), "1 minute"), col("objet"), col("action"))
            .sum("nb")
            .select(
                    concat(col("objet"), lit("-"), col("action")).as("__topic"),
                    col("window").cast(DataTypes.StringType).as("__key"),
                    col("sum(nb)").as("value"))
            .writeStream()
            .outputMode("update")
            .format("pulsar")
            .option("service.url", serviceUrl)
            .option("admin.url", adminUrl)
            .option("checkpointLocation", "/tmp/checkpoint")
            .start();
			
    query.awaitTermination();`

172.16.101.114 is the IP of the Pulsar Proxy.

When I send messages in "aom-emp", most of the times it works but sometimes the exception occurs and the Spark application crashes.

Expected behavior The application runs forever with no error.

mathieudruart avatar Jan 30 '20 21:01 mathieudruart

the error seems to happen at least everytime I launch the Spark Application for the first time when I receive the first message. If after the first failure I delete the checkpoint director, I relaunch the Spark Application and then send a new message to the topic, there is no more error.

I tried today with a Spark Application running inside Kubernetes (deployed with Spark Operator), and I get the exact same issue.

eladar2000 avatar Feb 01 '20 00:02 eladar2000

@yjshen can you please help check the errors?

sijie avatar Feb 04 '20 18:02 sijie

#29 is tracking the same issue, it is waiting for apache/pulsar#6120

yjshen avatar Feb 05 '20 02:02 yjshen

@yjshen we are waiting to Pulsar 2.5.1 to see if it changes something, meanwhile can we do something to mitigate this issue ?

mathieudruart avatar Feb 17 '20 20:02 mathieudruart

This issue should be resolved in latest Pulsar.

syhily avatar Dec 05 '22 12:12 syhily