kafka-connect-hdfs
kafka-connect-hdfs copied to clipboard
NullPoint Exception, io.confluent.connect.hdfs3.Hdfs3SinkTask.open
I'm using the hdfs3 connector to consume data in kafka and stack it in hdfs3.
My Kafka confluent, sink connect version is below.
kafka-confluent:5.4.2-1
confluentinc/kafka-connect-hdfs3:latest
and my connect settings are as follows:
name=Hdfs3SinkConnector
connector.class=io.confluent.connect.hdfs3.Hdfs3SinkConnector
confluent.topic.bootstrap.servers="~~"
consumer.auto.offset.reset=earliest
topics="~~"
hdfs.url=hdfs://~~
store.url=hdfs://~~
plugin.path="~~"
schema.compatibility=BACKWARD
format.class=io.confluent.connect.hdfs3.parquet.ParquetFormat
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
value.converter.schemas.enabled=true
value.converter.schema.registry.url="~~"
value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicNameStrategy
timestamp.extractor=RecordField
timestamp.field=""~~""
partitioner.class=io.confluent.connect.storage.partitioner.TimeBasedPartitioner
path.format=YYYYMMdd/HH/
partition.duration.ms=1000
tasks.max=3
flush.size=100000
rotate.interval.ms=900000
hadoop.conf.dir="~~"
hdfs.authentication.kerberos=true
kerberos.ticket.renew.period.ms=360000
connect.hdfs.keytab="~~"
connect.hdfs.principal="~~"
errors.log.enable=true
errors.log.include.messages=true
timezone=Asia/Seoul
locale=ko_KR
and i encounter errors like this
java.lang.NullPointerException
at io.confluent.connect.hdfs3.Hdfs3SinkTask.open(Hdfs3SinkTask.java:137)
at org.apache.kafka.connect.runtime.WorkerSinkTask.openPartitions(WorkerSinkTask.java:587)
at org.apache.kafka.connect.runtime.WorkerSinkTask.access$1100(WorkerSinkTask.java:67)
at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsAssigned(WorkerSinkTask.java:652)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:272)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:400)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:421)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:340)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:471)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:444)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:317)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Could it be a problem that appears when plugin.path is not set properly?
Hi @Kimakjun
This repo is for the HDFS2 connector. Your question seems to be about the HDFS3 one