streamx
streamx copied to clipboard
from kafka(avro format) to S3 (Parquet format)
I am trying to export the avro records from kafka and load them into s3 in parquet format. I got an exception as shown below.
ERROR WorkerSinkTask{id=s3-sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172) java.lang.NullPointerException at io.confluent.connect.hdfs.HdfsSinkTask.close(HdfsSinkTask.java:122) at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:377) at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:576) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:177) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) [2018-08-07 18:45:49,558] ERROR WorkerSinkTask{id=s3-sink-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:173) ^C[2018-08-07 18:55:39,021] INFO Kafka Connect stopping (org.apache.kafka.connect.runtime.Connect:65)
quickstart-s3.properties
name=s3-sink connector.class=com.qubole.streamx.s3.S3SinkConnector format.class=io.confluent.connect.hdfs.parquet.ParquetFormat partitioner.class=io.confluent.connect.hdfs.partitioner.HourlyPartitioner tasks.max=1 topics=abc flush.size=1
s3.url=https://s3.console.aws.amazon.com/s3/buckets/bucketname/topics/?region=us-west-2&tab=overview hadoop.conf.dir=pathtostreamx/streamx/config/hadoop-conf
connect-standalone.properties
bootstrap.servers=localhost:9092 key.converter=io.confluent.connect.avro.AvroConverter key.converter.schema.registry.url=http://localhost:8081 value.converter=io.confluent.connect.avro.AvroConverter value.converter.schema.registry.url=http://localhost:8081 key.converter.schemas.enable=true value.converter.schemas.enable=true
internal.key.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter.schemas.enable=false internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets offset.flush.interval.ms=10000 plugin.path=share/java
Thank you for any corrections or suggestions