GCS Sink Connector not able to write as parquet when published data using golang client for pulsar.. works fine with java client
@freeznet when I am writing data using pulsar golang client .. the below is the program I am using to write on pulsar topic and its able to publish on pulsar and I am also able to consume it .. but the stream native gcs sink connector is not able to write it as parquet on gcs and I am getting the below exception..
Slack Thread on Apache Pulsar Community : https://apache-pulsar.slack.com/archives/C5Z4T36F7/p1640579795494300
Exception Stack Trace : `16:28:54.381 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/sadim-test-json3][/pulsar-poc/cloud-storage-sink] Subscribed to topic on pulsar-poc-broker-1.pulsar-poc-broker.pulsar-poc.svc.cluster.local/10.248.1.83:6650 -- consumer: 43
16:28:54.436 [pulsar-io-cloud-storage-sink-flush-0] INFO org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink - Flushing [SinkRecord(sourceRecord=PulsarRecord(topicName=Optional[persistent://public/default/sadim-test-json3], partition=0, message=Optional[org.apache.pulsar.client.impl.TopicMessageImpl@23bbe054], schema=AUTO_CONSUME({schemaVersion=,schemaType=BYTES}{schemaVersion=org.apache.pulsar.common.protocol.schema.LatestVersion@2dd916ed,schemaType=JSON}), failFunction=org.apache.pulsar.functions.source.PulsarSource$$Lambda$322/0x0000000840688840@14f36ab9, ackFunction=org.apache.pulsar.functions.source.PulsarSource$$Lambda$321/0x0000000840688440@495fc424), value=[B@50d82707)] buffered records to blob store
16:28:54.436 [pulsar-io-cloud-storage-sink-flush-0] ERROR org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink - Caught unexpected exception:
org.apache.avro.SchemaParseException: Cannot parse
accuknox-mysql-helm-temp/pulsar-test-dec22-test1/public/default/CiliumTelemetryDemo27Dec/2021-12-27`
Golang Client Code : `package main import ( "context" "time" "fmt" "github.com/apache/pulsar-client-go/pulsar" "encoding/json" "strconv" )
func main() { fmt.Println("hello world") for i := 0; i < 500; i++ { fmt.Println("sadim"+ strconv.Itoa(i)) writePulsar("ciliumjson","sadim111") fmt.Println("sadimend"+ strconv.Itoa(i)) } fmt.Println("hello world 2") }
type AccuknoxJsonWrapperObject struct {
msg AccuknoxFinalJsonWrapperObject msg:"json"
}
type AccuknoxFinalJsonWrapperObject struct { inputjson string }
var ( exampleSchemaDef = "{"type":"record","name":"Example2","namespace":"test2","fields":[{"name":"inputjson","type":"string"}]}" )
func PublishToPulsarTopic(inputjson string, topicName string) (result string) { client, err := pulsar.NewClient(pulsar.ClientOptions{ URL: "pulsar://localhost:6650", OperationTimeout: 30 * time.Second, ConnectionTimeout: 30 * time.Second, }) if err != nil { fmt.Println("Could not instantiate Pulsar client: %v") }
defer client.Close()
JsonWrapperObject2 := AccuknoxFinalJsonWrapperObject{inputjson}
fmt.Println("sadim1")
fmt.Println(JsonWrapperObject2)
JsonWrapperObject := AccuknoxJsonWrapperObject{JsonWrapperObject2}
fmt.Println("sadim2")
fmt.Println(JsonWrapperObject)
file, _ := json.Marshal(JsonWrapperObject2)
fmt.Println("sadim3")
fmt.Println(file)
file2, _ := json.Marshal(JsonWrapperObject)
fmt.Println("sadim4")
fmt.Println(file2)
properties := make(map[string]string) properties["pulsar"] = "hello" fmt.Println("sadim5") fmt.Println(properties)
jsonSchemaWithProperties := pulsar.NewJSONSchema(exampleSchemaDef, properties) producer, err := client.CreateProducer(pulsar.ProducerOptions{ Topic: topicName, Schema: jsonSchemaWithProperties, })
if err != nil {
fmt.Println(err)
}
_, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
Value: (JsonWrapperObject2),
})
defer producer.Close()
if err != nil {
fmt.Println("Failed to publish message", err)
}
fmt.Println("Published message")
return ""
} type testJSON struct { InputJson string } var ( exampleSchemaDef5 = "{"type":"record","name":"testJSON","namespace":"test2"," + ""fields":[{"name":"InputJson","type":"testJSON"}]}" )
var ( exSchema = "{"type":"record","schema":"","properties":{"InputJson":"testJSON"}}" ) func writePulsar(inputjson string, topicName string) (result string) { client, err := pulsar.NewClient(pulsar.ClientOptions{ URL: "pulsar://localhost:6650", }) if err != nil { fmt.Println(err) } defer client.Close()
//a2 := testJSON{InputJson: inputjson} //properties := map[string]testJSON{"InputJson" : a2} properties2 := make(map[string]string) properties2["InputJson"] = "testJSON" jsonSchemaWithProperties := pulsar.NewJSONSchema(exampleSchemaDef5, properties2) producer, err := client.CreateProducer(pulsar.ProducerOptions{ Topic: topicName, Schema: jsonSchemaWithProperties, }) if err != nil { fmt.Println(err) }
_, err = producer.Send(context.Background(), &pulsar.ProducerMessage{ Value: &testJSON{ InputJson: inputjson, }, }) if err != nil { fmt.Println(err) } producer.Close() return "" }`
Java Working Client which is able to write as parquet on pulsar code: `package accuknox.datalake;
import java.io.BufferedReader; import java.io.File; import java.io.FileReader; import java.io.IOException;
import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.impl.schema.JSONSchema;
public class App3 {
public static void main(String[] args) throws InterruptedException, IOException {
PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
/*
* Producer<String> producer = client.newProducer(JSONSchema.of(String.class))
* .topic("persistent://public/default/cilium-telemetry-4") .create();
*/
//Schema.JSON
Producer<AccuknoxJsonObject> producer = client.newProducer(JSONSchema.of(AccuknoxJsonObject.class)).topic("CiliumTelemetryDemo27Dec").create();
//Producer<byte[]> producer = client.newProducer().topic("cilium-test1").create();
FileReader fr = null;
BufferedReader br = null;
AccuknoxJsonObject jsonObject = null;
for (int i=0;i<=90;i++) {
try
{
File file = new File("./knoxfeedermockdata/flow.txt");
//File file = new File("./knoxfeedermockdata/cilium_alerts_tableMockData.txt");
//File file = new File("./knoxfeedermockdata/cilium_v3_All.txt");
System.out.println(file.getAbsolutePath());
fr=new FileReader(file);
br=new BufferedReader(fr);
String line;
while((line=br.readLine())!=null)
{
System.out.println(line);
//for(int i = 0 ; i< 1000000; i++)
jsonObject = new AccuknoxJsonObject(line);
producer.send(jsonObject);
}
System.gc();
//this.getRuntime().gc()
fr.close(); //closes the stream and release the resources
}
catch(IOException e)
{
System.err.print(e);
} catch(NullPointerException e2) {
System.err.print(e2);
} finally {
try {
if(br != null)
br.close();
if(fr != null)
fr.close();
} catch(NullPointerException ex) {
System.err.print(ex);
}
}
//Thread.sleep(200);
}
producer.close();
client.close();
}
}
`
cc: @sijie
even if we get a sample code for a golang pulsar producer client which when sends data on a pulsar topic .. that data is written as parquet on gcs by stream native gcs sink connector .. that will work for us .. we can use the same syntax @freeznet @sijie