kafka-connect-hdfs icon indicating copy to clipboard operation
kafka-connect-hdfs copied to clipboard

Hdfs sink was working fine for sometime but now my sink is not working

Open bkotesh opened this issue 6 years ago • 4 comments

I am trying to park data in HAdoop in real time from Mysql using confluent-5.0.1. My hdfs-sing is throwing error with mentioned below. Mysql jdbc connection: { "name": "jdbc_source_mysql_registration_query", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "key.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "http://localhost:8081", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "http://localhost:8081", "connection.url": "jdbc:mysql://localhost:3310/users?zeroDateTimeBehavior=ROUND&useCursorFetch=true&defaultFetchSize=1000&user=kotesh&password=kotesh", "query": "SELECT matriid,DateUpdated,Language, Gender, MaritalStatus,MotherTongue, Religion, Caste,SubCaste, EducationId,OccupationCategory,AnnualIncomeinINR, ByWhom from users.data_names", "mode": "timestamp", "timestamp.column.name": "DateUpdated", "validate.non.null": "false", "topic.prefix": "mysql-prod-registrations-" } }

Hdfs Sink configuration: { "name":"hdfs-sink1", "config":{ "connector.class":"io.confluent.connect.hdfs.HdfsSinkConnector", "tasks.max":"3", "topics":"mysql-prod-registrations-", "hadoop.conf.dir":"/usr/hdp/current/hadoop-client/conf", "hadoop.home":"/usr/hdp/current/hadoop-client", "hdfs.url":"hdfs://HACluster:8020", "topics.dir":"/topics", "logs.dir":"/logs", "flush.size":"100", "rotate.interval.ms":"60000", "format.class":"io.confluent.connect.hdfs.avro.AvroFormat", "value.converter.schemas.enable": "true", "partitioner.class":"io.confluent.connect.hdfs.partitioner.TimeBasedPartitioner", "partition.duration.ms":"1800000", "path.format":"'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH/", "locale":"kor", "timezone":"Asia/Kolkata" } }

Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177) java.lang.NullPointerException at io.confluent.connect.hdfs.HdfsSinkTask.open(HdfsSinkTask.java:133) at org.apache.kafka.connect.runtime.WorkerSinkTask.openPartitions(WorkerSinkTask.java:612) at org.apache.kafka.connect.runtime.WorkerSinkTask.access$1100(WorkerSinkTask.java:69) at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsAssigned(WorkerSinkTask.java:672) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:283) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:422) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:352) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:337) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:343) at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1218) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1181) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115) at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:444) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:317) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) [2018-12-14 13:50:19,518] ERROR WorkerSinkTask{id=hdfs-sink-2} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:178)

bkotesh avatar Dec 14 '18 08:12 bkotesh

Can you try running your distributed connector using root?

II-VSB-II avatar Dec 14 '18 09:12 II-VSB-II

@II-VSB-II, I am not running application individually. Instead i am starting all the applications using confluent start. Can you share the steps how to start distributed connector. confluent status

bkotesh avatar Dec 14 '18 10:12 bkotesh

@II-VSB-II Still am having the same issue. Can you please help me out?? Thanks in advance.

bkotesh avatar Dec 17 '18 03:12 bkotesh

Check if the hdfs xml files are present with in hadoop.conf.dir properly and if you are able to resolve active name node with HA configuration. And of course the hdfs must be in good health. Try to provide more logs, there must be actual cause of null pointer exception logged.

kaushiksrinivas avatar Jun 27 '19 14:06 kaushiksrinivas