pulsar-spark icon indicating copy to clipboard operation
pulsar-spark copied to clipboard

[BUG]java.lang.RuntimeException: Failed to create schema for topic

Open daileizhi opened this issue 4 years ago • 1 comments

When I try to call dataset. Write(), output the data to pulsar, and throw the error that failed to create the schema

Environment: Pulsar-2.4.1 Spark-2.4.4

java.lang.RuntimeException: Failed to create schema for persistent://public/default/als_statistics_tocheck at org.apache.spark.sql.pulsar.SchemaUtils$.uploadPulsarSchema(SchemaUtils.scala:104) at org.apache.spark.sql.pulsar.PulsarRowWriter.singleProducer$lzycompute(PulsarWriteTask.scala:140) at org.apache.spark.sql.pulsar.PulsarRowWriter.singleProducer(PulsarWriteTask.scala:138) at org.apache.spark.sql.pulsar.PulsarRowWriter.producerFlush(PulsarWriteTask.scala:210)

 RecordSchemaBuilder schemaBuilder = SchemaBuilder.record("topLevelRecord");
 schemaBuilder.field("ip").type(SchemaType.STRING);
 schemaBuilder.field("port").type(SchemaType.INT32);
 schemaBuilder.field("url_id").type(SchemaType.STRING);
 schemaBuilder.field("response_rate").type(SchemaType.DOUBLE);
 schemaBuilder.field("success_rate").type(SchemaType.DOUBLE);
 schemaBuilder.field("average_response_time").type(SchemaType.DOUBLE);
 schemaBuilder.field("average_network_time").type(SchemaType.DOUBLE);
 schemaBuilder.field("start_time").type(SchemaType.TIMESTAMP);
 schemaBuilder.field("end_time").type(SchemaType.TIMESTAMP);

 SchemaInfo  statistics2checkSchemaInfo = schemaBuilder.build(SchemaType.AVRO);

  waitToCheckDataSet.write()
                        //.mode("append")
                        .format("pulsar")
                        .option("service.url", serviceUrl)
                        .option("admin.url", adminUrl)
                        .option("topic", statistics2CheckTopic)
                        .option("pulsar.producer.sendTimeoutMs","60000")
                        //.option("avroSchema",statistics2checkSchemaInfo.getSchemaDefinition())
                        //.option("recordName","CheckDataSet")
                        //.option("recordNamespace","com.some.domain")
                        .save();

try {
        admin.schemas().getSchemaInfo(statistics2CheckTopic);
    } catch (PulsarAdminException e) {
        if (404 == e.getStatusCode()) {
            admin.schemas().createSchema(statistics2CheckTopic,statistics2checkSchemaInfo);
        }
    }

    spark.readStream()
            .format("pulsar")
            .option("service.url", serviceUrl)
            .option("admin.url", adminUrl)
            .option("topic", statistics2CheckTopic)
            .option("startingOffsets", "earliest")
            .load()
            .withWatermark("__eventTime", "1 minute")
            .writeStream().queryName("WaitToCheckDataSet")
            .outputMode("append")
            .trigger(Trigger.ProcessingTime("1 minute"))
            .foreachBatch((dataset,batchId) -> {
                System.out.println("------WaitToCheckDataSet-------");
                 dataset.show(false);
           }).start();

In addition,how can support options "avroSchema,recordName,recordNamespace" The name of the schema has always been named "topLevelRecord".Except : com.some.domain.

http://spark.apache.org/docs/latest/sql-data-sources-avro.html { "type": "record", "name": "topLevelRecord", "fields": [ { "name": "ip", "type": [ "string", "null" ] }, { "name": "port", "type": [ "int", "null" ] }, { "name": "url_id", "type": "string" }, { "name": "response_rate", "type": [ "double", "null" ] }, { "name": "success_rate", "type": [ "double", "null" ] }, { "name": "average_response_time", "type": "double" }, { "name": "average_network_time", "type": "double" }, { "name": "start_time", "type": [ { "type": "long", "logicalType": "timestamp-micros" }, "null" ] }, { "name": "end_time", "type": [ { "type": "long", "logicalType": "timestamp-micros" }, "null" ] }, { "name": "type", "type": "int" } ] }

daileizhi avatar Dec 20 '19 17:12 daileizhi

Any resolution for this issue ?

swapnil-chougule avatar Apr 21 '21 14:04 swapnil-chougule

@swapnil-chougule Can you paste your exception trace here?

syhily avatar Dec 05 '22 12:12 syhily

SchemaUtils$.uploadPulsarSchema(SchemaUtils.scala:104) indicate that you should have admin privilege on the topic. This should be the root cause for this exception. So I don't think this is a bug.

syhily avatar Dec 18 '22 19:12 syhily