kafka-connect-hdfs icon indicating copy to clipboard operation
kafka-connect-hdfs copied to clipboard

Kafka connect task failed with NullPointer Exception

Open abhisheksahani opened this issue 6 years ago • 8 comments

java.lang.NullPointerException at org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:160) at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:109) at org.apache.parquet.hadoop.ParquetWriter.close(ParquetWriter.java:302) at io.confluent.connect.hdfs.parquet.ParquetRecordWriterProvider$1.close(ParquetRecordWriterProvider.java:112) at io.confluent.connect.hdfs.TopicPartitionWriter.closeTempFile(TopicPartitionWriter.java:689) at io.confluent.connect.hdfs.TopicPartitionWriter.close(TopicPartitionWriter.java:447) at io.confluent.connect.hdfs.DataWriter.close(DataWriter.java:459) at io.confluent.connect.hdfs.HdfsSinkTask.close(HdfsSinkTask.java:148) at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:396) at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:590) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

before this exception there are block replication errors

[2019-10-10 12:20:00,363] ERROR Error closing writer for datapipelinenewharish6.5d92fda646e0fb0009e83b48.gameplayed.topic-3. Error: org.apache.hadoop.ipc.RemoteException(java.io.IOException): File /topics/+tmp/datapipelinenewharish6.5d92fda646e0fb0009e83b48.gameplayed.topic/tenant=5d92fda646e0fb0009e83b48/groupid=5d92fda646e0fb0009e83b49/project=5d92fda646e0fb0009e83b4a/name=gameplayed/year=2019/month=10/day=09/hour=18/6e1772ac-1ae5-439d-acaa-6403f5cffa62_tmp.parquet could only be replicated to 0 nodes instead of minReplication (=1). There are 3 datanode(s) running and no node(s) are excluded in this operation. at org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:1547) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getNewBlockTargets(FSNamesystem.java:3107) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3031) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:724) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:492) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:969) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043) (io.confluent.connect.hdfs.DataWriter:461)

abhisheksahani avatar Oct 10 '19 06:10 abhisheksahani

The NPE which was posted in previous comment was coming while closing the file and it has been marked as bug Parquet Bug and fixed in 1.8.2 version. We are using 5.2.2 confluent version and kafka-connect-storage-common has references to multiple parquet jars.

  1. parquet-hadoop-bundle-1.6.0.jar
  2. parquet-hadoop-1.8.2.jar when we removed the first jar from /share/java/kafka-connect-storage-common the errors are gone. we couldn't find any references to this particular parquet-hadoop-bundle-1.6.0.jar in connector project, can you help us to understand where this jar is used.

abhisheksahani avatar Oct 10 '19 11:10 abhisheksahani

mvn dependency:tree says it is a transitive dependency pulled in by hive.

[INFO] +- io.confluent:kafka-connect-storage-hive:jar:5.4.0-SNAPSHOT:compile
[INFO] |  +- org.apache.hadoop:hadoop-client:jar:2.7.3:compile
[INFO] |  |  \- org.apache.hadoop:hadoop-annotations:jar:2.7.3:compile
[INFO] |  +- org.apache.hive.hcatalog:hive-hcatalog-core:jar:1.2.2:compile
[INFO] |  |  +- org.apache.hive:hive-cli:jar:1.2.2:compile
[INFO] |  |  |  +- org.apache.hive:hive-serde:jar:1.2.2:compile
[INFO] |  |  |  |  +- net.sf.opencsv:opencsv:jar:2.3:compile
[INFO] |  |  |  |  \- com.twitter:parquet-hadoop-bundle:jar:1.6.0:compile

If you do not use Hive integration, removing the jar should be fine.

ncliang avatar Oct 13 '19 03:10 ncliang

@ncliang thanks for your reply , we want to use hive integration for adhoc queries and the above error is occuring only if we usecombination of flush.size and (rotate.schedule.interval.ms or rotate.interval.ms). i have updated the hadoop cluster to 3.1.2 and trying to use latest hdfs 3 connector , lets see if it resolve the problem since the latest connector have parquet-hadoop-bundle:jar:1.8.2 jar . will let you know the result.

abhisheksahani avatar Oct 13 '19 21:10 abhisheksahani

@ncliang we upgraded our hadoop cluster to 3.1.2 and using kafka hdfs3 connector still we are facing many issues.

  1. when integrating the hive getting exception . io.confluent.connect.storage.errors.HiveMetaStoreException: Hive MetaStore exception at io.confluent.connect.storage.hive.HiveMetaStore.doAction(HiveMetaStore.java:99) at io.confluent.connect.storage.hive.HiveMetaStore.createTable(HiveMetaStore.java:223) at io.confluent.connect.hdfs3.avro.AvroHiveUtil.createTable(AvroHiveUtil.java:52) at io.confluent.connect.hdfs3.DataWriter$3.createTable(DataWriter.java:285) at io.confluent.connect.hdfs3.TopicPartitionWriter$1.call(TopicPartitionWriter.java:796) at io.confluent.connect.hdfs3.TopicPartitionWriter$1.call(TopicPartitionWriter.java:792) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: MetaException(message:java.lang.IllegalArgumentException: java.net.URISyntaxException: Relative path in absolute URI: hdfs://node-master:9820./null/topics/datapipelinenewharish8.5d8e2191c9e77c0009987772.rpUpdate.topic) Why a .null/ path in it?
  2. when using "format.class": "io.confluent.connect.hdfs3.parquet.ParquetFormat" getting exception : java.lang.NoClassDefFoundError: Could not initialize class org.xerial.snappy.Snappy at org.apache.parquet.hadoop.codec.SnappyCompressor.compress(SnappyCompressor.java:67) at org.apache.hadoop.io.compress.CompressorStream.compress(CompressorStream.java:81) at org.apache.hadoop.io.compress.CompressorStream.finish(CompressorStream.java:92) at org.apache.parquet.hadoop.CodecFactory$BytesCompressor.compress(CodecFactory.java:112) at org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:93) at org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:150) at org.apache.parquet.column.impl.ColumnWriterV1.flush(ColumnWriterV1.java:238) at org.apache.parquet.column.impl.ColumnWriteStoreV1.flush(ColumnWriteStoreV1.java:121) at org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:167) at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:109) at org.apache.parquet.hadoop.ParquetWriter.close(ParquetWriter.java:302) at io.confluent.connect.hdfs3.parquet.ParquetRecordWriterProvider$1.close(ParquetRecordWriterProvider.java:101) at io.confluent.connect.hdfs3.TopicPartitionWriter.closeTempFile(TopicPartitionWriter.java:687) at io.confluent.connect.hdfs3.TopicPartitionWriter.closeTempFile(TopicPartitionWriter.java:694) at io.confluent.connect.hdfs3.TopicPartitionWriter.write(TopicPartitionWriter.java:381) at io.confluent.connect.hdfs3.DataWriter.write(DataWriter.java:359) at io.confluent.connect.hdfs3.Hdfs3SinkTask.put(Hdfs3SinkTask.java:108) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) we checked jar snappy-java-1.1.1.3.jar is present in the plugin path which is share/java/confluentinc-kafka-connect-hdfs3-1.0.2-preview/lib plugin path is set as plugin.path=/home/tomcat/confluent-5.2.2/share/java in connect-avro-distributed.properties.

abhisheksahani avatar Oct 14 '19 01:10 abhisheksahani

For Hive integration, could you post your config? The null in path is strange. The ParquetFormat, it may be a known bug, but I will need to check.

ncliang avatar Oct 17 '19 18:10 ncliang

@ncliang this is the config for hdfs3 connector { "name": "testConnector42", "config": { "connector.class": "io.confluent.connect.hdfs3.Hdfs3SinkConnector", "tasks.max": "8", "hdfs.url": "hdfs://...:9000", "hadoop.conf.dir": "/home/hadoop/hadoop-3.1.2/etc/hadoop/", "hadoop.home": "/home/hadoop/hadoop-3.1.2/", "hive.conf.dir": "/home/hadoop/apache-hive-2.3.5-bin/conf/", "hive.home": "/home/hadoop/apache-hive-2.3.5-bin/", "flush.size": "20", "format.class": "io.confluent.connect.hdfs.parquet.ParquetFormat", "hive.integration": "true", "hive.database": "testhdfsNewTest8", "hive.metastore.uris": "thrift://...:9083", "schema.compatibility": "BACKWARD", "partitioner.class": "io.confluent.connect.storage.partitioner.FieldPartitioner", "locale": "en", "topics.regex": "datapipelinenewharish8...topic", "partition.field.name": "tenant,groupid,project,name", "timezone": "Asia/Calcutta", "rotate.schedule.interval.ms":30000 } }, hdfs 2 connector is working now with replication factor of hdfs set as 1 and we have 3 datanodes, earlier we were having 3 replica and the hdfs client at the the time of replication was not able to find the available datanodes and getting failed during replication, which ultimately leads to NPE.

abhisheksahani avatar Oct 19 '19 07:10 abhisheksahani

Glad that you were able to get hdfs 2 connector to work.

3 replica and the hdfs client at the the time of replication was not able to find the available datanodes

was this due to connectivity issue between connector and the datanodes? This page talks a bit more about how HDFS handles replication - https://hadoop.apache.org/docs/r1.2.1/hdfs_design.html#Replication+Pipelining

In the 3 replica setup, were you able to read/write from the HDFS cluster with usual hadoop/hdfs client? The connector just uses HDFS Java API to communicate with the HDFS cluster. If there is difference between hdfs client being able to read/write from cluster but connector is not, maybe we are not reading or using the hadoop configuration from hadoop.conf properly.

ncliang avatar Oct 25 '19 04:10 ncliang

FWIW, you should avoid using this project with a Hadoop 3 cluster as this project still uses Hadoop 2.x clients and RPC calls

OneCricketeer avatar May 02 '20 01:05 OneCricketeer