streamx icon indicating copy to clipboard operation
streamx copied to clipboard

JSON records to Parquet on S3 won't work

Open inneractive-opensrc opened this issue 7 years ago • 1 comments

Hi

I would like to write parquet directly on S3 my events are only JSON string. Do you know if what I'm trying to do can work ?

In fact I've already tried with this config

{ "name": "ParquetS3", "config": { "name": "ParquetS3", "connector.class": "com.qubole.streamx.s3.S3SinkConnector", "format.class": "io.confluent.connect.hdfs.parquet.ParquetFormat", "partitioner.class": "io.confluent.connect.hdfs.partitioner.HourlyPartitioner", "locale": "en", "timezone": "UTC", "tasks.max": 11, "topics": "XXX", "flush.size": 50000, "s3.url": "s3n://XXXXX", "hadoop.conf.dir": "/etc/kafka-connect-s3/hadoop-conf" } }

it's actually don't work

The only error I can see in the log is

Task ParquetS3-56 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:141) java.lang.NullPointerException at io.confluent.connect.hdfs.DataWriter.close(DataWriter.java:299) at io.confluent.connect.hdfs.HdfsSinkTask.close(HdfsSinkTask.java:122) at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:317) at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:480) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:152) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748)

Is there any other log output for streamX connector

Thanks

inneractive-opensrc avatar Jul 12 '17 13:07 inneractive-opensrc

@inneractive-opensrc This doesn't work because the io.confluent.connect.hdfs.parquet.ParquetFormat class expects the source data being read into the formatter to be Avro format, not JSON. If you check out my fork, I've added the ability to convert CSV sourced data to Parquet before uploading to S3, which is very similar to what you want. You'd just need to write code similar to the CSV Parquet converter that instead converts JSON source data to Parquet.

lewisdawson avatar Sep 20 '17 06:09 lewisdawson