pulsar-io-cloud-storage icon indicating copy to clipboard operation
pulsar-io-cloud-storage copied to clipboard

GCS Sink Connector not able to write as parquet when published data using golang client for pulsar.. works fine with java client

Open mdsadimnadeem opened this issue 4 years ago • 2 comments

@freeznet when I am writing data using pulsar golang client .. the below is the program I am using to write on pulsar topic and its able to publish on pulsar and I am also able to consume it .. but the stream native gcs sink connector is not able to write it as parquet on gcs and I am getting the below exception..

Slack Thread on Apache Pulsar Community : https://apache-pulsar.slack.com/archives/C5Z4T36F7/p1640579795494300

Exception Stack Trace : `16:28:54.381 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/sadim-test-json3][/pulsar-poc/cloud-storage-sink] Subscribed to topic on pulsar-poc-broker-1.pulsar-poc-broker.pulsar-poc.svc.cluster.local/10.248.1.83:6650 -- consumer: 43 16:28:54.436 [pulsar-io-cloud-storage-sink-flush-0] INFO org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink - Flushing [SinkRecord(sourceRecord=PulsarRecord(topicName=Optional[persistent://public/default/sadim-test-json3], partition=0, message=Optional[org.apache.pulsar.client.impl.TopicMessageImpl@23bbe054], schema=AUTO_CONSUME({schemaVersion=,schemaType=BYTES}{schemaVersion=org.apache.pulsar.common.protocol.schema.LatestVersion@2dd916ed,schemaType=JSON}), failFunction=org.apache.pulsar.functions.source.PulsarSource$$Lambda$322/0x0000000840688840@14f36ab9, ackFunction=org.apache.pulsar.functions.source.PulsarSource$$Lambda$321/0x0000000840688440@495fc424), value=[B@50d82707)] buffered records to blob store 16:28:54.436 [pulsar-io-cloud-storage-sink-flush-0] ERROR org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink - Caught unexpected exception: org.apache.avro.SchemaParseException: Cannot parse schema at org.apache.avro.Schema.parse(Schema.java:1633) ~[java-instance.jar:?] at org.apache.avro.Schema$Parser.parse(Schema.java:1430) ~[java-instance.jar:?] at org.apache.avro.Schema$Parser.parse(Schema.java:1418) ~[java-instance.jar:?] at org.apache.pulsar.io.jcloud.util.AvroRecordUtil.convertToAvroSchema(AvroRecordUtil.java:103) ~[7k9dpWmnkw8dXf7zEFCwXw/:?] at org.apache.pulsar.io.jcloud.format.ParquetFormat.initSchema(ParquetFormat.java:63) ~[7k9dpWmnkw8dXf7zEFCwXw/:?] at org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink.unsafeFlush(BlobStoreAbstractSink.java:239) ~[7k9dpWmnkw8dXf7zEFCwXw/:?] at org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink.flush(BlobStoreAbstractSink.java:219) ~[7k9dpWmnkw8dXf7zEFCwXw/:?] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?] at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) [?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?] at java.lang.Thread.run(Thread.java:829) [?:?] 16:28:55.788 [pulsar-io-cloud-storage-sink-flush-0] INFO org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink - Skip flushing, because the pending flush queue is empty ...

accuknox-mysql-helm-temp/pulsar-test-dec22-test1/public/default/CiliumTelemetryDemo27Dec/2021-12-27`

Golang Client Code : `package main import ( "context" "time" "fmt" "github.com/apache/pulsar-client-go/pulsar" "encoding/json" "strconv" )

func main() { fmt.Println("hello world") for i := 0; i < 500; i++ { fmt.Println("sadim"+ strconv.Itoa(i)) writePulsar("ciliumjson","sadim111") fmt.Println("sadimend"+ strconv.Itoa(i)) } fmt.Println("hello world 2") }

type AccuknoxJsonWrapperObject struct { msg AccuknoxFinalJsonWrapperObject msg:"json" }

type AccuknoxFinalJsonWrapperObject struct { inputjson string }

var ( exampleSchemaDef = "{"type":"record","name":"Example2","namespace":"test2","fields":[{"name":"inputjson","type":"string"}]}" )

func PublishToPulsarTopic(inputjson string, topicName string) (result string) { client, err := pulsar.NewClient(pulsar.ClientOptions{ URL: "pulsar://localhost:6650", OperationTimeout: 30 * time.Second, ConnectionTimeout: 30 * time.Second, }) if err != nil { fmt.Println("Could not instantiate Pulsar client: %v") }

defer client.Close()

	JsonWrapperObject2 := AccuknoxFinalJsonWrapperObject{inputjson}
	fmt.Println("sadim1")
	fmt.Println(JsonWrapperObject2)
	JsonWrapperObject := AccuknoxJsonWrapperObject{JsonWrapperObject2}
	fmt.Println("sadim2")
	fmt.Println(JsonWrapperObject)
	
	file, _ := json.Marshal(JsonWrapperObject2)
	fmt.Println("sadim3")
	fmt.Println(file)
	file2, _ := json.Marshal(JsonWrapperObject)
	fmt.Println("sadim4")
	fmt.Println(file2)

properties := make(map[string]string) properties["pulsar"] = "hello" fmt.Println("sadim5") fmt.Println(properties)

jsonSchemaWithProperties := pulsar.NewJSONSchema(exampleSchemaDef, properties) producer, err := client.CreateProducer(pulsar.ProducerOptions{ Topic: topicName, Schema: jsonSchemaWithProperties, })

if err != nil {
fmt.Println(err)
}

_, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
Value: (JsonWrapperObject2),
})

defer producer.Close()

if err != nil {
fmt.Println("Failed to publish message", err)
}
fmt.Println("Published message")

return ""

} type testJSON struct { InputJson string } var ( exampleSchemaDef5 = "{"type":"record","name":"testJSON","namespace":"test2"," + ""fields":[{"name":"InputJson","type":"testJSON"}]}" )

var ( exSchema = "{"type":"record","schema":"","properties":{"InputJson":"testJSON"}}" ) func writePulsar(inputjson string, topicName string) (result string) { client, err := pulsar.NewClient(pulsar.ClientOptions{ URL: "pulsar://localhost:6650", }) if err != nil { fmt.Println(err) } defer client.Close()

//a2 := testJSON{InputJson: inputjson} //properties := map[string]testJSON{"InputJson" : a2} properties2 := make(map[string]string) properties2["InputJson"] = "testJSON" jsonSchemaWithProperties := pulsar.NewJSONSchema(exampleSchemaDef5, properties2) producer, err := client.CreateProducer(pulsar.ProducerOptions{ Topic: topicName, Schema: jsonSchemaWithProperties, }) if err != nil { fmt.Println(err) }

_, err = producer.Send(context.Background(), &pulsar.ProducerMessage{ Value: &testJSON{ InputJson: inputjson, }, }) if err != nil { fmt.Println(err) } producer.Close() return "" }`

Java Working Client which is able to write as parquet on pulsar code: `package accuknox.datalake;

import java.io.BufferedReader; import java.io.File; import java.io.FileReader; import java.io.IOException;

import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.impl.schema.JSONSchema;

public class App3 { public static void main(String[] args) throws InterruptedException, IOException { PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build(); /* * Producer<String> producer = client.newProducer(JSONSchema.of(String.class)) * .topic("persistent://public/default/cilium-telemetry-4") .create(); */ //Schema.JSON Producer<AccuknoxJsonObject> producer = client.newProducer(JSONSchema.of(AccuknoxJsonObject.class)).topic("CiliumTelemetryDemo27Dec").create(); //Producer<byte[]> producer = client.newProducer().topic("cilium-test1").create(); FileReader fr = null; BufferedReader br = null; AccuknoxJsonObject jsonObject = null; for (int i=0;i<=90;i++) { try
{
File file = new File("./knoxfeedermockdata/flow.txt"); //File file = new File("./knoxfeedermockdata/cilium_alerts_tableMockData.txt"); //File file = new File("./knoxfeedermockdata/cilium_v3_All.txt"); System.out.println(file.getAbsolutePath()); fr=new FileReader(file); br=new BufferedReader(fr); String line;
while((line=br.readLine())!=null) { System.out.println(line); //for(int i = 0 ; i< 1000000; i++) jsonObject = new AccuknoxJsonObject(line); producer.send(jsonObject); } System.gc(); //this.getRuntime().gc() fr.close(); //closes the stream and release the resources
}
catch(IOException e)
{
System.err.print(e); } catch(NullPointerException e2) { System.err.print(e2); } finally { try { if(br != null) br.close(); if(fr != null) fr.close(); } catch(NullPointerException ex) { System.err.print(ex); } } //Thread.sleep(200); } producer.close(); client.close(); } } `

cc: @sijie

mdsadimnadeem avatar Dec 29 '21 06:12 mdsadimnadeem

even if we get a sample code for a golang pulsar producer client which when sends data on a pulsar topic .. that data is written as parquet on gcs by stream native gcs sink connector .. that will work for us .. we can use the same syntax @freeznet @sijie

mdsadimnadeem avatar Dec 29 '21 06:12 mdsadimnadeem