kafka-connect-hdfs
kafka-connect-hdfs copied to clipboard
Kafka connect task failed with NullPointer Exception
java.lang.NullPointerException at org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:160) at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:109) at org.apache.parquet.hadoop.ParquetWriter.close(ParquetWriter.java:302) at io.confluent.connect.hdfs.parquet.ParquetRecordWriterProvider$1.close(ParquetRecordWriterProvider.java:112) at io.confluent.connect.hdfs.TopicPartitionWriter.closeTempFile(TopicPartitionWriter.java:689) at io.confluent.connect.hdfs.TopicPartitionWriter.close(TopicPartitionWriter.java:447) at io.confluent.connect.hdfs.DataWriter.close(DataWriter.java:459) at io.confluent.connect.hdfs.HdfsSinkTask.close(HdfsSinkTask.java:148) at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:396) at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:590) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
before this exception there are block replication errors
[2019-10-10 12:20:00,363] ERROR Error closing writer for datapipelinenewharish6.5d92fda646e0fb0009e83b48.gameplayed.topic-3. Error: org.apache.hadoop.ipc.RemoteException(java.io.IOException): File /topics/+tmp/datapipelinenewharish6.5d92fda646e0fb0009e83b48.gameplayed.topic/tenant=5d92fda646e0fb0009e83b48/groupid=5d92fda646e0fb0009e83b49/project=5d92fda646e0fb0009e83b4a/name=gameplayed/year=2019/month=10/day=09/hour=18/6e1772ac-1ae5-439d-acaa-6403f5cffa62_tmp.parquet could only be replicated to 0 nodes instead of minReplication (=1). There are 3 datanode(s) running and no node(s) are excluded in this operation. at org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:1547) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getNewBlockTargets(FSNamesystem.java:3107) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3031) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:724) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:492) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:969) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043) (io.confluent.connect.hdfs.DataWriter:461)
The NPE which was posted in previous comment was coming while closing the file and it has been marked as bug Parquet Bug and fixed in 1.8.2 version. We are using 5.2.2 confluent version and kafka-connect-storage-common has references to multiple parquet jars.
- parquet-hadoop-bundle-1.6.0.jar
- parquet-hadoop-1.8.2.jar when we removed the first jar from /share/java/kafka-connect-storage-common the errors are gone. we couldn't find any references to this particular parquet-hadoop-bundle-1.6.0.jar in connector project, can you help us to understand where this jar is used.
mvn dependency:tree says it is a transitive dependency pulled in by hive.
[INFO] +- io.confluent:kafka-connect-storage-hive:jar:5.4.0-SNAPSHOT:compile
[INFO] | +- org.apache.hadoop:hadoop-client:jar:2.7.3:compile
[INFO] | | \- org.apache.hadoop:hadoop-annotations:jar:2.7.3:compile
[INFO] | +- org.apache.hive.hcatalog:hive-hcatalog-core:jar:1.2.2:compile
[INFO] | | +- org.apache.hive:hive-cli:jar:1.2.2:compile
[INFO] | | | +- org.apache.hive:hive-serde:jar:1.2.2:compile
[INFO] | | | | +- net.sf.opencsv:opencsv:jar:2.3:compile
[INFO] | | | | \- com.twitter:parquet-hadoop-bundle:jar:1.6.0:compile
If you do not use Hive integration, removing the jar should be fine.
@ncliang thanks for your reply , we want to use hive integration for adhoc queries and the above error is occuring only if we usecombination of flush.size and (rotate.schedule.interval.ms or rotate.interval.ms). i have updated the hadoop cluster to 3.1.2 and trying to use latest hdfs 3 connector , lets see if it resolve the problem since the latest connector have parquet-hadoop-bundle:jar:1.8.2 jar . will let you know the result.
@ncliang we upgraded our hadoop cluster to 3.1.2 and using kafka hdfs3 connector still we are facing many issues.
- when integrating the hive getting exception . io.confluent.connect.storage.errors.HiveMetaStoreException: Hive MetaStore exception at io.confluent.connect.storage.hive.HiveMetaStore.doAction(HiveMetaStore.java:99) at io.confluent.connect.storage.hive.HiveMetaStore.createTable(HiveMetaStore.java:223) at io.confluent.connect.hdfs3.avro.AvroHiveUtil.createTable(AvroHiveUtil.java:52) at io.confluent.connect.hdfs3.DataWriter$3.createTable(DataWriter.java:285) at io.confluent.connect.hdfs3.TopicPartitionWriter$1.call(TopicPartitionWriter.java:796) at io.confluent.connect.hdfs3.TopicPartitionWriter$1.call(TopicPartitionWriter.java:792) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: MetaException(message:java.lang.IllegalArgumentException: java.net.URISyntaxException: Relative path in absolute URI: hdfs://node-master:9820./null/topics/datapipelinenewharish8.5d8e2191c9e77c0009987772.rpUpdate.topic) Why a .null/ path in it?
- when using "format.class": "io.confluent.connect.hdfs3.parquet.ParquetFormat" getting exception : java.lang.NoClassDefFoundError: Could not initialize class org.xerial.snappy.Snappy at org.apache.parquet.hadoop.codec.SnappyCompressor.compress(SnappyCompressor.java:67) at org.apache.hadoop.io.compress.CompressorStream.compress(CompressorStream.java:81) at org.apache.hadoop.io.compress.CompressorStream.finish(CompressorStream.java:92) at org.apache.parquet.hadoop.CodecFactory$BytesCompressor.compress(CodecFactory.java:112) at org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:93) at org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:150) at org.apache.parquet.column.impl.ColumnWriterV1.flush(ColumnWriterV1.java:238) at org.apache.parquet.column.impl.ColumnWriteStoreV1.flush(ColumnWriteStoreV1.java:121) at org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:167) at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:109) at org.apache.parquet.hadoop.ParquetWriter.close(ParquetWriter.java:302) at io.confluent.connect.hdfs3.parquet.ParquetRecordWriterProvider$1.close(ParquetRecordWriterProvider.java:101) at io.confluent.connect.hdfs3.TopicPartitionWriter.closeTempFile(TopicPartitionWriter.java:687) at io.confluent.connect.hdfs3.TopicPartitionWriter.closeTempFile(TopicPartitionWriter.java:694) at io.confluent.connect.hdfs3.TopicPartitionWriter.write(TopicPartitionWriter.java:381) at io.confluent.connect.hdfs3.DataWriter.write(DataWriter.java:359) at io.confluent.connect.hdfs3.Hdfs3SinkTask.put(Hdfs3SinkTask.java:108) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) we checked jar snappy-java-1.1.1.3.jar is present in the plugin path which is share/java/confluentinc-kafka-connect-hdfs3-1.0.2-preview/lib plugin path is set as plugin.path=/home/tomcat/confluent-5.2.2/share/java in connect-avro-distributed.properties.
For Hive integration, could you post your config? The null in path is strange.
The ParquetFormat, it may be a known bug, but I will need to check.
@ncliang this is the config for hdfs3 connector { "name": "testConnector42", "config": { "connector.class": "io.confluent.connect.hdfs3.Hdfs3SinkConnector", "tasks.max": "8", "hdfs.url": "hdfs://...:9000", "hadoop.conf.dir": "/home/hadoop/hadoop-3.1.2/etc/hadoop/", "hadoop.home": "/home/hadoop/hadoop-3.1.2/", "hive.conf.dir": "/home/hadoop/apache-hive-2.3.5-bin/conf/", "hive.home": "/home/hadoop/apache-hive-2.3.5-bin/", "flush.size": "20", "format.class": "io.confluent.connect.hdfs.parquet.ParquetFormat", "hive.integration": "true", "hive.database": "testhdfsNewTest8", "hive.metastore.uris": "thrift://...:9083", "schema.compatibility": "BACKWARD", "partitioner.class": "io.confluent.connect.storage.partitioner.FieldPartitioner", "locale": "en", "topics.regex": "datapipelinenewharish8...topic", "partition.field.name": "tenant,groupid,project,name", "timezone": "Asia/Calcutta", "rotate.schedule.interval.ms":30000 } }, hdfs 2 connector is working now with replication factor of hdfs set as 1 and we have 3 datanodes, earlier we were having 3 replica and the hdfs client at the the time of replication was not able to find the available datanodes and getting failed during replication, which ultimately leads to NPE.
Glad that you were able to get hdfs 2 connector to work.
3 replica and the hdfs client at the the time of replication was not able to find the available datanodes
was this due to connectivity issue between connector and the datanodes? This page talks a bit more about how HDFS handles replication - https://hadoop.apache.org/docs/r1.2.1/hdfs_design.html#Replication+Pipelining
In the 3 replica setup, were you able to read/write from the HDFS cluster with usual hadoop/hdfs client? The connector just uses HDFS Java API to communicate with the HDFS cluster. If there is difference between hdfs client being able to read/write from cluster but connector is not, maybe we are not reading or using the hadoop configuration from hadoop.conf properly.
FWIW, you should avoid using this project with a Hadoop 3 cluster as this project still uses Hadoop 2.x clients and RPC calls