pulsar-spark
pulsar-spark copied to clipboard
[BUG]java.lang.RuntimeException: Failed to create schema for topic
When I try to call dataset. Write(), output the data to pulsar, and throw the error that failed to create the schema
Environment: Pulsar-2.4.1 Spark-2.4.4
java.lang.RuntimeException: Failed to create schema for persistent://public/default/als_statistics_tocheck at org.apache.spark.sql.pulsar.SchemaUtils$.uploadPulsarSchema(SchemaUtils.scala:104) at org.apache.spark.sql.pulsar.PulsarRowWriter.singleProducer$lzycompute(PulsarWriteTask.scala:140) at org.apache.spark.sql.pulsar.PulsarRowWriter.singleProducer(PulsarWriteTask.scala:138) at org.apache.spark.sql.pulsar.PulsarRowWriter.producerFlush(PulsarWriteTask.scala:210)
RecordSchemaBuilder schemaBuilder = SchemaBuilder.record("topLevelRecord");
schemaBuilder.field("ip").type(SchemaType.STRING);
schemaBuilder.field("port").type(SchemaType.INT32);
schemaBuilder.field("url_id").type(SchemaType.STRING);
schemaBuilder.field("response_rate").type(SchemaType.DOUBLE);
schemaBuilder.field("success_rate").type(SchemaType.DOUBLE);
schemaBuilder.field("average_response_time").type(SchemaType.DOUBLE);
schemaBuilder.field("average_network_time").type(SchemaType.DOUBLE);
schemaBuilder.field("start_time").type(SchemaType.TIMESTAMP);
schemaBuilder.field("end_time").type(SchemaType.TIMESTAMP);
SchemaInfo statistics2checkSchemaInfo = schemaBuilder.build(SchemaType.AVRO);
waitToCheckDataSet.write()
//.mode("append")
.format("pulsar")
.option("service.url", serviceUrl)
.option("admin.url", adminUrl)
.option("topic", statistics2CheckTopic)
.option("pulsar.producer.sendTimeoutMs","60000")
//.option("avroSchema",statistics2checkSchemaInfo.getSchemaDefinition())
//.option("recordName","CheckDataSet")
//.option("recordNamespace","com.some.domain")
.save();
try {
admin.schemas().getSchemaInfo(statistics2CheckTopic);
} catch (PulsarAdminException e) {
if (404 == e.getStatusCode()) {
admin.schemas().createSchema(statistics2CheckTopic,statistics2checkSchemaInfo);
}
}
spark.readStream()
.format("pulsar")
.option("service.url", serviceUrl)
.option("admin.url", adminUrl)
.option("topic", statistics2CheckTopic)
.option("startingOffsets", "earliest")
.load()
.withWatermark("__eventTime", "1 minute")
.writeStream().queryName("WaitToCheckDataSet")
.outputMode("append")
.trigger(Trigger.ProcessingTime("1 minute"))
.foreachBatch((dataset,batchId) -> {
System.out.println("------WaitToCheckDataSet-------");
dataset.show(false);
}).start();
In addition,how can support options "avroSchema,recordName,recordNamespace" The name of the schema has always been named "topLevelRecord".Except : com.some.domain.
http://spark.apache.org/docs/latest/sql-data-sources-avro.html { "type": "record", "name": "topLevelRecord", "fields": [ { "name": "ip", "type": [ "string", "null" ] }, { "name": "port", "type": [ "int", "null" ] }, { "name": "url_id", "type": "string" }, { "name": "response_rate", "type": [ "double", "null" ] }, { "name": "success_rate", "type": [ "double", "null" ] }, { "name": "average_response_time", "type": "double" }, { "name": "average_network_time", "type": "double" }, { "name": "start_time", "type": [ { "type": "long", "logicalType": "timestamp-micros" }, "null" ] }, { "name": "end_time", "type": [ { "type": "long", "logicalType": "timestamp-micros" }, "null" ] }, { "name": "type", "type": "int" } ] }
Any resolution for this issue ?
@swapnil-chougule Can you paste your exception trace here?
SchemaUtils$.uploadPulsarSchema(SchemaUtils.scala:104)
indicate that you should have admin privilege on the topic. This should be the root cause for this exception. So I don't think this is a bug.