kafka-connect-hdfs
kafka-connect-hdfs copied to clipboard
hdfs connector not consuming kafka topic
Hi all, I came to a problem that hdfs connector not consuming my kafka topic, however connect-file-sink do. After I started the job, i've wait about 3 minutes but none commit log ever show up, so i kill the job using ctrl + c. I noticed an error log appeared. The detail are as shown below.
The runtime log is as follows:
hdfs.url = hdfs://10.103.18.9:9000 hdfs.authentication.kerberos = false hive.metastore.uris = partition.field.name = date kerberos.ticket.renew.period.ms = 3600000 shutdown.timeout.ms = 3000 partitioner.class = io.confluent.connect.hdfs.partitioner.FieldPartitioner storage.class = io.confluent.connect.hdfs.storage.HdfsStorage path.format = (io.confluent.connect.hdfs.HdfsSinkConnectorConfig:135) [2017-01-10 12:37:10,018] INFO Hadoop configuration directory (io.confluent.connect.hdfs.DataWriter:94) [2017-01-10 12:37:10,295] WARN Unable to load native-hadoop library for your platform... using builtin-java classes where applicable (org.apache.hadoop.util.NativeCodeLoader:62) [2017-01-10 12:37:10,901] INFO Started recovery for topic partition test_kafka_docs2-2 (io.confluent.connect.hdfs.TopicPartitionWriter:193) [2017-01-10 12:37:10,912] INFO Finished recovery for topic partition test_kafka_docs2-2 (io.confluent.connect.hdfs.TopicPartitionWriter:208) [2017-01-10 12:37:10,912] INFO Started recovery for topic partition test_kafka_docs2-1 (io.confluent.connect.hdfs.TopicPartitionWriter:193) [2017-01-10 12:37:10,916] INFO Finished recovery for topic partition test_kafka_docs2-1 (io.confluent.connect.hdfs.TopicPartitionWriter:208) [2017-01-10 12:37:10,917] INFO Started recovery for topic partition test_kafka_docs2-0 (io.confluent.connect.hdfs.TopicPartitionWriter:193) [2017-01-10 12:37:10,921] INFO Finished recovery for topic partition test_kafka_docs2-0 (io.confluent.connect.hdfs.TopicPartitionWriter:208) [2017-01-10 12:37:10,921] INFO Started recovery for topic partition test_kafka_docs2-5 (io.confluent.connect.hdfs.TopicPartitionWriter:193) [2017-01-10 12:37:10,925] INFO Finished recovery for topic partition test_kafka_docs2-5 (io.confluent.connect.hdfs.TopicPartitionWriter:208) [2017-01-10 12:37:10,925] INFO Started recovery for topic partition test_kafka_docs2-4 (io.confluent.connect.hdfs.TopicPartitionWriter:193) [2017-01-10 12:37:10,929] INFO Finished recovery for topic partition test_kafka_docs2-4 (io.confluent.connect.hdfs.TopicPartitionWriter:208) [2017-01-10 12:37:10,929] INFO Started recovery for topic partition test_kafka_docs2-3 (io.confluent.connect.hdfs.TopicPartitionWriter:193) [2017-01-10 12:37:10,932] INFO Finished recovery for topic partition test_kafka_docs2-3 (io.confluent.connect.hdfs.TopicPartitionWriter:208) [2017-01-10 12:37:10,932] INFO Sink task org.apache.kafka.connect.runtime.WorkerSinkTask@47c548fc finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:155) ^C[2017-01-10 12:40:13,518] INFO Kafka Connect stopping (org.apache.kafka.connect.runtime.Connect:68) [2017-01-10 12:40:13,527] INFO Stopped ServerConnector@3382f8ae{HTTP/1.1}{0.0.0.0:8083} (org.eclipse.jetty.server.ServerConnector:306) [2017-01-10 12:40:13,539] INFO Stopped o.e.j.s.ServletContextHandler@2974f221{/,null,UNAVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:865) [2017-01-10 12:40:13,541] INFO Herder stopping (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:62) [2017-01-10 12:40:13,541] INFO Stopping task hdfs-sink3-0 (org.apache.kafka.connect.runtime.Worker:305) [2017-01-10 12:40:13,543] INFO Starting graceful shutdown of thread WorkerSinkTask-hdfs-sink3-0 (org.apache.kafka.connect.util.ShutdownableThread:119) [2017-01-10 12:40:18,544] INFO Forcing shutdown of thread WorkerSinkTask-hdfs-sink3-0 (org.apache.kafka.connect.util.ShutdownableThread:141) [2017-01-10 12:40:18,546] ERROR Graceful stop of task org.apache.kafka.connect.runtime.WorkerSinkTask@47c548fc failed. (org.apache.kafka.connect.runtime.Worker:312) [2017-01-10 12:40:18,564] INFO Stopping connector hdfs-sink3 (org.apache.kafka.connect.runtime.Worker:226) [2017-01-10 12:40:18,565] INFO Stopped connector hdfs-sink3 (org.apache.kafka.connect.runtime.Worker:240) [2017-01-10 12:40:18,565] INFO Herder stopped (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:77) [2017-01-10 12:40:18,565] INFO Worker stopping (org.apache.kafka.connect.runtime.Worker:115) [2017-01-10 12:40:18,566] INFO Stopped FileOffsetBackingStore (org.apache.kafka.connect.storage.FileOffsetBackingStore:61) [2017-01-10 12:40:18,566] INFO Worker stopped (org.apache.kafka.connect.runtime.Worker:155) [2017-01-10 12:40:18,566] INFO Kafka Connect stopped (org.apache.kafka.connect.runtime.Connect:74)
The command i start job is ./bin/connect-standalone etc/kafka/connect-standalone.properties etc/kafka-connect-hdfs/quickstart-hdfs.properties
The connect-standalone.properties is as follows: bootstrap.servers=10.103.17.106:9092 key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=false value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter.schemas.enable=false internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets offset.flush.interval.ms=10000
The quickstart-hdfs.properties is as follows: name=hdfs-sink3 connector.class=io.confluent.connect.hdfs.HdfsSinkConnector tasks.max=1 topics=test_kafka_docs2 #topics=test_hdfs_kafka hdfs.url=hdfs://10.103.18.9:9000 flush.size=1000
Could anyone give some suggestions? thanks
@heifrank I see two things I would change here to troubleshoot:
- flush.size is set to 1000. I would set this to something small like 3 and manually produce records to the topic to see if you can get it to trigger a file roll.
- This is a bit weird but maybe a copy/paste issue. I would make sure these are on separate lines: offset.storage.file.filename=/tmp/connect.offsets offset.flush.interval.ms=10000
@cotedm Thanks for your reply.
- my kafka topic contains 160000 records so i set flush.size to 1000, I've tried to set it to 3, it didn't work either.
- yes it's a copy/paste issue, they are on different lines.
It's wierd that my offset.flush.interval.ms is set to 10000 but it didn't show any commit message in this time interval. I waited 3 minutes and check kafka manager but didn't find any consumer consuming my topic. However using connect-file-sink properties works fine. It behaves the same in both version 2.0.0 and 3.0.0.
@heifrank do you see any directories created in HDFS? If the connection to HDFS is working properly, you should see a /topics and /logs directory created in HDFS by the connector. If you don't see those directories created on HDFS that could be your problem. If those directories are created, you should see something being written to /logs, if not please verify the permissions there.
@cotedm the /topics and /logs directory are created, but /logs is empty. /topics only has a /+tmp directory which is empty too. So it can't be an permission issue. Two things I notice:
- if i use schema registry properties which is etc/schema-registry/connect-avro-standalone.properties and consume another topic whose data is avro data format, it works fine.
- if i use etc/kafka/connect-standalone.properties and consume topic whose data is Json, it doesn't consume any message at all, but if i change hdfs-sink to file-sink it works fine.
@heifrank I think the problem then is with the fact that we don't currently have a good way for you to write Json data built into the connector. It's some future work (see #74) but there are a lot of decisions to be made in order for it to really work for everyone. In the meantime, you can directly dump the Json data you have in to HDFS via the pluggable format of the connector. There is a connector built off of this one that has a SourceFormat that could work for you. Note, that hive integration won't work if you do this, but you can get your Json data over this way.
If you are wondering, the easiest way to plug this in is to build that connector's jar file and add the jar to the classpath. Then update the format.class config to point to the class. Please give that a go and let us know how it works out for future reference.
I have the same problem with json format record. Like heifrank said: it works fine by using avro format and schema registry. But once I changed to json, it could write data in topic but couldn't be consumed from kafka-hdfs-sink connector... see error logs below
[2017-04-04 13:49:49,835] ERROR Failed to convert config data to Kafka Connect format: (org.apache.kafka.connect.storage.KafkaConfigBackingStore:440) org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error: at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:305) at org.apache.kafka.connect.storage.KafkaConfigBackingStore$ConsumeCallback.onCompletion(KafkaConfigBackingStore.java:438) at org.apache.kafka.connect.storage.KafkaConfigBackingStore$ConsumeCallback.onCompletion(KafkaConfigBackingStore.java:427) at org.apache.kafka.connect.util.KafkaBasedLog.poll(KafkaBasedLog.java:251) at org.apache.kafka.connect.util.KafkaBasedLog.readToLogEnd(KafkaBasedLog.java:274) at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:141) at org.apache.kafka.connect.storage.KafkaConfigBackingStore.start(KafkaConfigBackingStore.java:246) at org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:95) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:195) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'Struct': was expecting ('true', 'false' or 'null') at [Source: [B@64136ffe; line: 1, column: 8] Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'Struct': was expecting ('true', 'false' or 'null') at [Source: [B@64136ffe; line: 1, column: 8] at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1487) at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:518) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3323) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2482) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:801) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:697) at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3604) at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3549) at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2161) at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:50) at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:303) at org.apache.kafka.connect.storage.KafkaConfigBackingStore$ConsumeCallback.onCompletion(KafkaConfigBackingStore.java:438) at org.apache.kafka.connect.storage.KafkaConfigBackingStore$ConsumeCallback.onCompletion(KafkaConfigBackingStore.java:427) at org.apache.kafka.connect.util.KafkaBasedLog.poll(KafkaBasedLog.java:251) at org.apache.kafka.connect.util.KafkaBasedLog.readToLogEnd(KafkaBasedLog.java:274) at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:141) at org.apache.kafka.connect.storage.KafkaConfigBackingStore.start(KafkaConfigBackingStore.java:246) at org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:95) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:195) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) [2017-04-04 13:49:49,842] ERROR Failed to convert config data to Kafka Connect format: (org.apache.kafka.connect.storage.KafkaConfigBackingStore:440) org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error: at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:305) at org.apache.kafka.connect.storage.KafkaConfigBackingStore$ConsumeCallback.onCompletion(KafkaConfigBackingStore.java:438) at org.apache.kafka.connect.storage.KafkaConfigBackingStore$ConsumeCallback.onCompletion(KafkaConfigBackingStore.java:427) at org.apache.kafka.connect.util.KafkaBasedLog.poll(KafkaBasedLog.java:251) at org.apache.kafka.connect.util.KafkaBasedLog.readToLogEnd(KafkaBasedLog.java:274) at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:141) at org.apache.kafka.connect.storage.KafkaConfigBackingStore.start(KafkaConfigBackingStore.java:246) at org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:95) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:195) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'Struct': was expecting ('true', 'false' or 'null') at [Source: [B@6e628bd1; line: 1, column: 8] Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'Struct': was expecting ('true', 'false' or 'null') at [Source: [B@6e628bd1; line: 1, column: 8] at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1487) at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:518) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3323) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2482) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:801) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:697) at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3604) at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3549) at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2161) at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:50) at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:303) at org.apache.kafka.connect.storage.KafkaConfigBackingStore$ConsumeCallback.onCompletion(KafkaConfigBackingStore.java:438) at org.apache.kafka.connect.storage.KafkaConfigBackingStore$ConsumeCallback.onCompletion(KafkaConfigBackingStore.java:427) at org.apache.kafka.connect.util.KafkaBasedLog.poll(KafkaBasedLog.java:251) at org.apache.kafka.connect.util.KafkaBasedLog.readToLogEnd(KafkaBasedLog.java:274) at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:141) at org.apache.kafka.connect.storage.KafkaConfigBackingStore.start(KafkaConfigBackingStore.java:246) at org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:95) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:195) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) [2017-04-04 13:49:49,843] INFO Removed connector json-elasticsearch-sink due to null configuration. This is usually intentional and does not indicate an issue. (org.apache.kafka.connect.storage.KafkaConfigBackingStore:495)
@xiabai84 Your problem seems unrelated. It looks like you have data in the Kafka Connect config topic that is in a different format than expected. This could happen if you had multiple Kafka Connect clusters trying to write to the same topic (with different internal converter configs), or a misconfigured worker. I'd suggest opening a new issue as this is unrelated to the original issue in this thread.
@heifrank Note that flush.size is per topic partition not per topic. This can be important if you had 16k messages in your topic, but more than 16 partitions since you may never commit files in that case. If you were hitting the point of committing files, we would expect to see messages like
Starting commit and rotation for topic partition ...
in the log. The lack of these messages suggests you're not hitting any of the conditions that triggers rotating files.
If you can't get any of these to trigger, you could also try the rotate.interval.ms setting, which will force rotation of files after a certain interval of wall-clock time. This gives you an easy way to at least ensure data is committed every N seconds, and should be useful for diagnosing whether the tasks are stuck or just not satisfying the other conditions for rotating files.
I have the same problem as @heifrank . I am using the latest confluentinc/cp-kafka-connect docker container. Connector job seems to stuck with [2017-05-25 10:53:50,533] INFO Reflections took 37787 ms to scan 562 urls, producing 13659 keys and 89522 values (org.reflections.Reflections:229).
My Configs:
name=kafka-hdfs
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
task.max=1
topics=topic4
hdfs.url=hdfs://localhost:9000
#format.class=io.confluent.connect.hdfs.parquet.ParquetFormat
flush.size=50
rotate.interval.ms=10000
bootstrap.servers=localhost2:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
As you see I also tried with ParquetFormat the result is the same.
@ewencp I also set the rotate.interval.ms but nothing happens. The directories in the Hdfs were created, but they are empty.
Still facing same problem as @heifrank and @baluchicken. @ewencp , any work around or suggestion would be appreciated?
hdfs config
name=hdfs-sink
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=1
topics=KAFKA_SPARK_FANOUT_SRP_TEST_JSON
hdfs.url=hdfs://localhost:9000
topics.dir=/home/hadoop/kafka/data
logs.dir=/home/hadoop/kafka/wal
format.class=io.confluent.connect.hdfs.parquet.ParquetFormat
partitioner.class=io.confluent.connect.hdfs.partitioner.FieldPartitioner
partition.field.name=PRT_ID
flush.size=100
rotate.interval.ms=10000
connector config
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schema.registry.url=http://localhost:8081
key.converter.schemas.enable=false
value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
schemas.enable=false
My schema with payload
{
"schema":{
"type": "struct",
"version":1,
"name" : "KAFKA_SPARK_FANOUT_SRP_TEST_JSON",
"fields" : [
{"type":"int32","optional":true,"field":"PRT_ID"},
{"type":"string","optional":true,"field":"CRTD_BY"},
{"type":"int64","optional":true,"field":"CRTD_DT","name": "org.apache.kafka.connect.data.Timestamp", "version": 1},
{"type":"bytes","optional":true,"field":"RESIDUAL","name": "org.apache.kafka.connect.data.Decimal", "version": 1}
]
},
"payload":{
"PRT_ID":-999,
"LINE_ID":2568731,
"CRTD_BY":"SUDHIR",
"CRTD_DT":1498039957000,
"RESIDUAL": -1234.567
}
}
Problem statement
1. If I enable schema value.converter.schemas.enable=true throws Null pointer exception
[2017-10-26 12:44:10,521] ERROR Task hdfs-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:148)
java.lang.NullPointerException
at org.apache.kafka.connect.json.JsonConverter$13.convert(JsonConverter.java:191)
at org.apache.kafka.connect.json.JsonConverter.convertToConnect(JsonConverter.java:716)
at org.apache.kafka.connect.json.JsonConverter.access$000(JsonConverter.java:52)
at org.apache.kafka.connect.json.JsonConverter$12.convert(JsonConverter.java:176)
at org.apache.kafka.connect.json.JsonConverter.convertToConnect(JsonConverter.java:712)
at org.apache.kafka.connect.json.JsonConverter.jsonToConnect(JsonConverter.java:331)
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:320)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:407)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:250)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:180)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
2. If I disable schema value.converter.schemas.enable=false , it creates a directory like +tmp like

But no record is being consumed and it getting hung / stuck with following log,
[2017-10-26 13:00:17,133] INFO Started ServerConnector@144beac5{HTTP/1.1}{0.0.0.0:8084} (org.eclipse.jetty.server.ServerConnector:266)
[2017-10-26 13:00:17,135] INFO Started @22564ms (org.eclipse.jetty.server.Server:379)
[2017-10-26 13:00:17,137] INFO REST server listening at http://192.168.20.236:8084/, advertising URL http://192.168.20.236:8084/ (org.apache.kafka.connect.runtime.rest.RestServer:150)
[2017-10-26 13:00:17,138] INFO Kafka Connect started (org.apache.kafka.connect.runtime.Connect:55)
[2017-10-26 13:00:17,174] INFO ConnectorConfig values:
connector.class = io.confluent.connect.hdfs.HdfsSinkConnector
key.converter = null
name = hdfs-sink
tasks.max = 1
transforms = null
value.converter = null
(org.apache.kafka.connect.runtime.ConnectorConfig:223)
[2017-10-26 13:00:17,175] INFO EnrichedConnectorConfig values:
connector.class = io.confluent.connect.hdfs.HdfsSinkConnector
key.converter = null
name = hdfs-sink
tasks.max = 1
transforms = null
value.converter = null
(org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:223)
[2017-10-26 13:00:17,175] INFO Creating connector hdfs-sink of type io.confluent.connect.hdfs.HdfsSinkConnector (org.apache.kafka.connect.runtime.Worker:204)
[2017-10-26 13:00:17,177] INFO Instantiated connector hdfs-sink with version 3.3.0 of type class io.confluent.connect.hdfs.HdfsSinkConnector (org.apache.kafka.connect.runtime.Worker:207)
[2017-10-26 13:00:17,178] INFO HdfsSinkConnectorConfig values:
connect.hdfs.keytab =
connect.hdfs.principal =
filename.offset.zero.pad.width = 10
flush.size = 100
format.class = io.confluent.connect.hdfs.parquet.ParquetFormat
hadoop.conf.dir =
hadoop.home =
hdfs.authentication.kerberos = false
hdfs.namenode.principal =
hdfs.url = hdfs://localhost:9000
hive.conf.dir =
hive.database = default
hive.home =
hive.integration = false
hive.metastore.uris =
kerberos.ticket.renew.period.ms = 3600000
locale =
logs.dir = /home/hadoop/kafka/wal
partition.duration.ms = -1
partition.field.name = PRT_ID
partitioner.class = io.confluent.connect.hdfs.partitioner.FieldPartitioner
path.format =
retry.backoff.ms = 5000
rotate.interval.ms = 10000
rotate.schedule.interval.ms = -1
schema.cache.size = 1000
schema.compatibility = NONE
shutdown.timeout.ms = 3000
storage.class = io.confluent.connect.hdfs.storage.HdfsStorage
timezone =
topics.dir = /home/hadoop/kafka/data
(io.confluent.connect.hdfs.HdfsSinkConnectorConfig:223)
[2017-10-26 13:00:17,184] INFO Finished creating connector hdfs-sink (org.apache.kafka.connect.runtime.Worker:225)
[2017-10-26 13:00:17,189] INFO SinkConnectorConfig values:
connector.class = io.confluent.connect.hdfs.HdfsSinkConnector
key.converter = null
name = hdfs-sink
tasks.max = 1
topics = [KAFKA_SPARK_FANOUT_SRP_TEST_JSON]
transforms = null
value.converter = null
(org.apache.kafka.connect.runtime.SinkConnectorConfig:223)
[2017-10-26 13:00:17,189] INFO EnrichedConnectorConfig values:
connector.class = io.confluent.connect.hdfs.HdfsSinkConnector
key.converter = null
name = hdfs-sink
tasks.max = 1
topics = [KAFKA_SPARK_FANOUT_SRP_TEST_JSON]
transforms = null
value.converter = null
(org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:223)
[2017-10-26 13:00:17,192] INFO Creating task hdfs-sink-0 (org.apache.kafka.connect.runtime.Worker:358)
[2017-10-26 13:00:17,192] INFO ConnectorConfig values:
connector.class = io.confluent.connect.hdfs.HdfsSinkConnector
key.converter = null
name = hdfs-sink
tasks.max = 1
transforms = null
value.converter = null
(org.apache.kafka.connect.runtime.ConnectorConfig:223)
[2017-10-26 13:00:17,193] INFO EnrichedConnectorConfig values:
connector.class = io.confluent.connect.hdfs.HdfsSinkConnector
key.converter = null
name = hdfs-sink
tasks.max = 1
transforms = null
value.converter = null
(org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:223)
[2017-10-26 13:00:17,194] INFO TaskConfig values:
task.class = class io.confluent.connect.hdfs.HdfsSinkTask
(org.apache.kafka.connect.runtime.TaskConfig:223)
[2017-10-26 13:00:17,194] INFO Instantiated task hdfs-sink-0 with version 3.3.0 of type io.confluent.connect.hdfs.HdfsSinkTask (org.apache.kafka.connect.runtime.Worker:373)
[2017-10-26 13:00:17,214] INFO ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.id =
connections.max.idle.ms = 540000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = connect-hdfs-sink
heartbeat.interval.ms = 3000
interceptor.classes = null
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 305000
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
(org.apache.kafka.clients.consumer.ConsumerConfig:223)
[2017-10-26 13:00:17,341] INFO Kafka version : 0.11.0.0-cp1 (org.apache.kafka.common.utils.AppInfoParser:83)
[2017-10-26 13:00:17,341] INFO Kafka commitId : 5cadaa94d0a69e0d (org.apache.kafka.common.utils.AppInfoParser:84)
[2017-10-26 13:00:17,351] INFO HdfsSinkConnectorConfig values:
connect.hdfs.keytab =
connect.hdfs.principal =
filename.offset.zero.pad.width = 10
flush.size = 100
format.class = io.confluent.connect.hdfs.parquet.ParquetFormat
hadoop.conf.dir =
hadoop.home =
hdfs.authentication.kerberos = false
hdfs.namenode.principal =
hdfs.url = hdfs://localhost:9000
hive.conf.dir =
hive.database = default
hive.home =
hive.integration = false
hive.metastore.uris =
kerberos.ticket.renew.period.ms = 3600000
locale =
logs.dir = /home/hadoop/kafka/wal
partition.duration.ms = -1
partition.field.name = PRT_ID
partitioner.class = io.confluent.connect.hdfs.partitioner.FieldPartitioner
path.format =
retry.backoff.ms = 5000
rotate.interval.ms = 10000
rotate.schedule.interval.ms = -1
schema.cache.size = 1000
schema.compatibility = NONE
shutdown.timeout.ms = 3000
storage.class = io.confluent.connect.hdfs.storage.HdfsStorage
timezone =
topics.dir = /home/hadoop/kafka/data
(io.confluent.connect.hdfs.HdfsSinkConnectorConfig:223)
[2017-10-26 13:00:17,371] INFO Created connector hdfs-sink (org.apache.kafka.connect.cli.ConnectStandalone:91)
[2017-10-26 13:00:17,702] INFO AvroDataConfig values:
schemas.cache.config = 1000
enhanced.avro.schema.support = false
connect.meta.data = true
(io.confluent.connect.avro.AvroDataConfig:170)
[2017-10-26 13:00:17,707] INFO Hadoop configuration directory (io.confluent.connect.hdfs.DataWriter:93)
[2017-10-26 13:00:18,512] WARN Unable to load native-hadoop library for your platform... using builtin-java classes where applicable (org.apache.hadoop.util.NativeCodeLoader:62)
[2017-10-26 13:00:19,685] INFO Sink task WorkerSinkTask{id=hdfs-sink-0} finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:233)
[2017-10-26 13:00:19,857] INFO Discovered coordinator localhost:9092 (id: 2147483647 rack: null) for group connect-hdfs-sink. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:597)
[2017-10-26 13:00:19,861] INFO Revoking previously assigned partitions [] for group connect-hdfs-sink (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:419)
[2017-10-26 13:00:19,861] INFO (Re-)joining group connect-hdfs-sink (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:432)
[2017-10-26 13:00:19,886] INFO Successfully joined group connect-hdfs-sink with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:399)
[2017-10-26 13:00:19,892] INFO Setting newly assigned partitions [KAFKA_SPARK_FANOUT_SRP_TEST_JSON-0] for group connect-hdfs-sink (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:262)
[2017-10-26 13:00:19,958] INFO Started recovery for topic partition KAFKA_SPARK_FANOUT_SRP_TEST_JSON-0 (io.confluent.connect.hdfs.TopicPartitionWriter:208)
[2017-10-26 13:00:19,970] INFO Finished recovery for topic partition KAFKA_SPARK_FANOUT_SRP_TEST_JSON-0 (io.confluent.connect.hdfs.TopicPartitionWriter:223)
Same problem exists for me: Log Error: org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: java.lang.StackOverflowError\n\tat org.apache.parquet.avro.AvroSchemaConverter.convertUnion(AvroSchemaConverter.java:214)\n\tat org.apache.parquet.avro.AvroSchemaConverter.convertField(AvroSchemaConverter.java:171)\n\tat org.apache.parquet.avro.AvroSchemaConverter.convertField(AvroSchemaConverter.java:130)\n\tat org.apache.parquet.avro.AvroSchemaConverter.convertField(AvroSchemaConverter.java:227)\n\tat org.apache.parquet.avro.AvroSchemaConverter.convertFields(AvroSchemaConverter.java:124)\n\tat org.apache.parquet.avro.AvroSchemaConverter.convertField(AvroSchemaConverter.java:152)\n\tat org.apache.parquet.avro.AvroSchemaConverter.convertField(AvroSchemaConverter.java:157)\n\tat org.apache.parquet.avro.AvroSchemaConverter.convertUnion(AvroSchemaConverter.java:214)\n\tat org.apache.parquet.avro.AvroSchemaConverter.convertField(AvroSchemaConverter.java:171)\n\tat org.apache.parquet.avro.AvroSchemaConverter.convertField(AvroSchemaConverter.java:130)\n\tat org.apache.parquet.avro.AvroSchemaConverter.convertField(AvroSchemaConverter.java:227)
@gwenshap , @ewencp i am also having the same issue. Unable to figure out the exact error and fix the issue. Kindly help in resolving. my hdfs-sink configuration { "name":"hdfs-sink", "config":{ "connector.class":"io.confluent.connect.hdfs.HdfsSinkConnector", "tasks.max":"1", "topics":"mysql-prod-registrations", "hadoop.conf.dir":"/usr/hdp/current/hadoop-client/conf", "hadoop.home":"/usr/hdp/current/hadoop-client/", "hdfs.url":"hdfs://HACluster:8020", "topics.dir":"/topics", "logs.dir":"/logs", "flush.size":"100", "rotate.interval.ms":"60000", "format.class":"io.confluent.connect.hdfs.avro.AvroFormat", "value.converter.schemas.enable": "false", "partitioner.class":"io.confluent.connect.hdfs.partitioner.TimeBasedPartitioner", "partition.duration.ms":"1800000", "path.format":"'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH/", "locale":"en", "timezone":"Asia/Kolkata" } }
ERROR: { "name": "hdfs-sink", "connector": { "state": "RUNNING", "worker_id": "xxxxxxx:8083" }, "tasks": [ { "state": "FAILED", "trace": "java.lang.NullPointerException\n\tat io.confluent.connect.hdfs.HdfsSinkTask.open(HdfsSinkTask.java:133)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.openPartitions(WorkerSinkTask.java:612)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.access$1100(WorkerSinkTask.java:69)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsAssigned(WorkerSinkTask.java:672)\n\tat org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:283)\n\tat org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:422)\n\tat org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:352)\n\tat org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:337)\n\tat org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:343)\n\tat org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1218)\n\tat org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1181)\n\tat org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:444)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:317)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\tat java.lang.Thread.run(Thread.java:745)\n", "id": 0, "worker_id": "xxxxxxx:8083" } ], "type": "sink" }
Same problem here, all schemas disabled in the Kafka HDFS connector, but receiving error message:
JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.