streamx
streamx copied to clipboard
JSON records to Parquet on S3 won't work
Hi
I would like to write parquet directly on S3 my events are only JSON string. Do you know if what I'm trying to do can work ?
In fact I've already tried with this config
{ "name": "ParquetS3", "config": { "name": "ParquetS3", "connector.class": "com.qubole.streamx.s3.S3SinkConnector", "format.class": "io.confluent.connect.hdfs.parquet.ParquetFormat", "partitioner.class": "io.confluent.connect.hdfs.partitioner.HourlyPartitioner", "locale": "en", "timezone": "UTC", "tasks.max": 11, "topics": "XXX", "flush.size": 50000, "s3.url": "s3n://XXXXX", "hadoop.conf.dir": "/etc/kafka-connect-s3/hadoop-conf" } }
it's actually don't work
The only error I can see in the log is
Task ParquetS3-56 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:141) java.lang.NullPointerException at io.confluent.connect.hdfs.DataWriter.close(DataWriter.java:299) at io.confluent.connect.hdfs.HdfsSinkTask.close(HdfsSinkTask.java:122) at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:317) at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:480) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:152) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748)
Is there any other log output for streamX connector
Thanks
@inneractive-opensrc This doesn't work because the io.confluent.connect.hdfs.parquet.ParquetFormat
class expects the source data being read into the formatter to be Avro format, not JSON. If you check out my fork, I've added the ability to convert CSV sourced data to Parquet before uploading to S3, which is very similar to what you want. You'd just need to write code similar to the CSV Parquet converter that instead converts JSON source data to Parquet.