kafka-connect-hdfs
kafka-connect-hdfs copied to clipboard
Hdfs sink was working fine for sometime but now my sink is not working
I am trying to park data in HAdoop in real time from Mysql using confluent-5.0.1. My hdfs-sing is throwing error with mentioned below. Mysql jdbc connection: { "name": "jdbc_source_mysql_registration_query", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "key.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "http://localhost:8081", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "http://localhost:8081", "connection.url": "jdbc:mysql://localhost:3310/users?zeroDateTimeBehavior=ROUND&useCursorFetch=true&defaultFetchSize=1000&user=kotesh&password=kotesh", "query": "SELECT matriid,DateUpdated,Language, Gender, MaritalStatus,MotherTongue, Religion, Caste,SubCaste, EducationId,OccupationCategory,AnnualIncomeinINR, ByWhom from users.data_names", "mode": "timestamp", "timestamp.column.name": "DateUpdated", "validate.non.null": "false", "topic.prefix": "mysql-prod-registrations-" } }
Hdfs Sink configuration: { "name":"hdfs-sink1", "config":{ "connector.class":"io.confluent.connect.hdfs.HdfsSinkConnector", "tasks.max":"3", "topics":"mysql-prod-registrations-", "hadoop.conf.dir":"/usr/hdp/current/hadoop-client/conf", "hadoop.home":"/usr/hdp/current/hadoop-client", "hdfs.url":"hdfs://HACluster:8020", "topics.dir":"/topics", "logs.dir":"/logs", "flush.size":"100", "rotate.interval.ms":"60000", "format.class":"io.confluent.connect.hdfs.avro.AvroFormat", "value.converter.schemas.enable": "true", "partitioner.class":"io.confluent.connect.hdfs.partitioner.TimeBasedPartitioner", "partition.duration.ms":"1800000", "path.format":"'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH/", "locale":"kor", "timezone":"Asia/Kolkata" } }
Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177) java.lang.NullPointerException at io.confluent.connect.hdfs.HdfsSinkTask.open(HdfsSinkTask.java:133) at org.apache.kafka.connect.runtime.WorkerSinkTask.openPartitions(WorkerSinkTask.java:612) at org.apache.kafka.connect.runtime.WorkerSinkTask.access$1100(WorkerSinkTask.java:69) at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsAssigned(WorkerSinkTask.java:672) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:283) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:422) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:352) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:337) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:343) at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1218) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1181) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115) at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:444) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:317) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) [2018-12-14 13:50:19,518] ERROR WorkerSinkTask{id=hdfs-sink-2} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:178)
Can you try running your distributed connector using root?
@II-VSB-II, I am not running application individually. Instead i am starting all the applications using confluent start. Can you share the steps how to start distributed connector.
@II-VSB-II Still am having the same issue. Can you please help me out?? Thanks in advance.
Check if the hdfs xml files are present with in hadoop.conf.dir properly and if you are able to resolve active name node with HA configuration. And of course the hdfs must be in good health. Try to provide more logs, there must be actual cause of null pointer exception logged.