kafka-connect-hdfs icon indicating copy to clipboard operation
kafka-connect-hdfs copied to clipboard

hdfs connector not consuming kafka topic

Open heifrank opened this issue 8 years ago • 13 comments

Hi all, I came to a problem that hdfs connector not consuming my kafka topic, however connect-file-sink do. After I started the job, i've wait about 3 minutes but none commit log ever show up, so i kill the job using ctrl + c. I noticed an error log appeared. The detail are as shown below.

The runtime log is as follows:

hdfs.url = hdfs://10.103.18.9:9000 hdfs.authentication.kerberos = false hive.metastore.uris = partition.field.name = date kerberos.ticket.renew.period.ms = 3600000 shutdown.timeout.ms = 3000 partitioner.class = io.confluent.connect.hdfs.partitioner.FieldPartitioner storage.class = io.confluent.connect.hdfs.storage.HdfsStorage path.format = (io.confluent.connect.hdfs.HdfsSinkConnectorConfig:135) [2017-01-10 12:37:10,018] INFO Hadoop configuration directory (io.confluent.connect.hdfs.DataWriter:94) [2017-01-10 12:37:10,295] WARN Unable to load native-hadoop library for your platform... using builtin-java classes where applicable (org.apache.hadoop.util.NativeCodeLoader:62) [2017-01-10 12:37:10,901] INFO Started recovery for topic partition test_kafka_docs2-2 (io.confluent.connect.hdfs.TopicPartitionWriter:193) [2017-01-10 12:37:10,912] INFO Finished recovery for topic partition test_kafka_docs2-2 (io.confluent.connect.hdfs.TopicPartitionWriter:208) [2017-01-10 12:37:10,912] INFO Started recovery for topic partition test_kafka_docs2-1 (io.confluent.connect.hdfs.TopicPartitionWriter:193) [2017-01-10 12:37:10,916] INFO Finished recovery for topic partition test_kafka_docs2-1 (io.confluent.connect.hdfs.TopicPartitionWriter:208) [2017-01-10 12:37:10,917] INFO Started recovery for topic partition test_kafka_docs2-0 (io.confluent.connect.hdfs.TopicPartitionWriter:193) [2017-01-10 12:37:10,921] INFO Finished recovery for topic partition test_kafka_docs2-0 (io.confluent.connect.hdfs.TopicPartitionWriter:208) [2017-01-10 12:37:10,921] INFO Started recovery for topic partition test_kafka_docs2-5 (io.confluent.connect.hdfs.TopicPartitionWriter:193) [2017-01-10 12:37:10,925] INFO Finished recovery for topic partition test_kafka_docs2-5 (io.confluent.connect.hdfs.TopicPartitionWriter:208) [2017-01-10 12:37:10,925] INFO Started recovery for topic partition test_kafka_docs2-4 (io.confluent.connect.hdfs.TopicPartitionWriter:193) [2017-01-10 12:37:10,929] INFO Finished recovery for topic partition test_kafka_docs2-4 (io.confluent.connect.hdfs.TopicPartitionWriter:208) [2017-01-10 12:37:10,929] INFO Started recovery for topic partition test_kafka_docs2-3 (io.confluent.connect.hdfs.TopicPartitionWriter:193) [2017-01-10 12:37:10,932] INFO Finished recovery for topic partition test_kafka_docs2-3 (io.confluent.connect.hdfs.TopicPartitionWriter:208) [2017-01-10 12:37:10,932] INFO Sink task org.apache.kafka.connect.runtime.WorkerSinkTask@47c548fc finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:155) ^C[2017-01-10 12:40:13,518] INFO Kafka Connect stopping (org.apache.kafka.connect.runtime.Connect:68) [2017-01-10 12:40:13,527] INFO Stopped ServerConnector@3382f8ae{HTTP/1.1}{0.0.0.0:8083} (org.eclipse.jetty.server.ServerConnector:306) [2017-01-10 12:40:13,539] INFO Stopped o.e.j.s.ServletContextHandler@2974f221{/,null,UNAVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:865) [2017-01-10 12:40:13,541] INFO Herder stopping (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:62) [2017-01-10 12:40:13,541] INFO Stopping task hdfs-sink3-0 (org.apache.kafka.connect.runtime.Worker:305) [2017-01-10 12:40:13,543] INFO Starting graceful shutdown of thread WorkerSinkTask-hdfs-sink3-0 (org.apache.kafka.connect.util.ShutdownableThread:119) [2017-01-10 12:40:18,544] INFO Forcing shutdown of thread WorkerSinkTask-hdfs-sink3-0 (org.apache.kafka.connect.util.ShutdownableThread:141) [2017-01-10 12:40:18,546] ERROR Graceful stop of task org.apache.kafka.connect.runtime.WorkerSinkTask@47c548fc failed. (org.apache.kafka.connect.runtime.Worker:312) [2017-01-10 12:40:18,564] INFO Stopping connector hdfs-sink3 (org.apache.kafka.connect.runtime.Worker:226) [2017-01-10 12:40:18,565] INFO Stopped connector hdfs-sink3 (org.apache.kafka.connect.runtime.Worker:240) [2017-01-10 12:40:18,565] INFO Herder stopped (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:77) [2017-01-10 12:40:18,565] INFO Worker stopping (org.apache.kafka.connect.runtime.Worker:115) [2017-01-10 12:40:18,566] INFO Stopped FileOffsetBackingStore (org.apache.kafka.connect.storage.FileOffsetBackingStore:61) [2017-01-10 12:40:18,566] INFO Worker stopped (org.apache.kafka.connect.runtime.Worker:155) [2017-01-10 12:40:18,566] INFO Kafka Connect stopped (org.apache.kafka.connect.runtime.Connect:74)

The command i start job is ./bin/connect-standalone etc/kafka/connect-standalone.properties etc/kafka-connect-hdfs/quickstart-hdfs.properties

The connect-standalone.properties is as follows: bootstrap.servers=10.103.17.106:9092 key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=false value.converter.schemas.enable=false

internal.key.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter.schemas.enable=false internal.value.converter.schemas.enable=false

offset.storage.file.filename=/tmp/connect.offsets offset.flush.interval.ms=10000

The quickstart-hdfs.properties is as follows: name=hdfs-sink3 connector.class=io.confluent.connect.hdfs.HdfsSinkConnector tasks.max=1 topics=test_kafka_docs2 #topics=test_hdfs_kafka hdfs.url=hdfs://10.103.18.9:9000 flush.size=1000

Could anyone give some suggestions? thanks

heifrank avatar Jan 10 '17 04:01 heifrank

@heifrank I see two things I would change here to troubleshoot:

  1. flush.size is set to 1000. I would set this to something small like 3 and manually produce records to the topic to see if you can get it to trigger a file roll.
  2. This is a bit weird but maybe a copy/paste issue. I would make sure these are on separate lines: offset.storage.file.filename=/tmp/connect.offsets offset.flush.interval.ms=10000

cotedm avatar Jan 10 '17 15:01 cotedm

@cotedm Thanks for your reply.

  1. my kafka topic contains 160000 records so i set flush.size to 1000, I've tried to set it to 3, it didn't work either.
  2. yes it's a copy/paste issue, they are on different lines.

It's wierd that my offset.flush.interval.ms is set to 10000 but it didn't show any commit message in this time interval. I waited 3 minutes and check kafka manager but didn't find any consumer consuming my topic. However using connect-file-sink properties works fine. It behaves the same in both version 2.0.0 and 3.0.0.

heifrank avatar Jan 11 '17 02:01 heifrank

@heifrank do you see any directories created in HDFS? If the connection to HDFS is working properly, you should see a /topics and /logs directory created in HDFS by the connector. If you don't see those directories created on HDFS that could be your problem. If those directories are created, you should see something being written to /logs, if not please verify the permissions there.

cotedm avatar Jan 11 '17 19:01 cotedm

@cotedm the /topics and /logs directory are created, but /logs is empty. /topics only has a /+tmp directory which is empty too. So it can't be an permission issue. Two things I notice:

  1. if i use schema registry properties which is etc/schema-registry/connect-avro-standalone.properties and consume another topic whose data is avro data format, it works fine.
  2. if i use etc/kafka/connect-standalone.properties and consume topic whose data is Json, it doesn't consume any message at all, but if i change hdfs-sink to file-sink it works fine.

heifrank avatar Jan 12 '17 03:01 heifrank

@heifrank I think the problem then is with the fact that we don't currently have a good way for you to write Json data built into the connector. It's some future work (see #74) but there are a lot of decisions to be made in order for it to really work for everyone. In the meantime, you can directly dump the Json data you have in to HDFS via the pluggable format of the connector. There is a connector built off of this one that has a SourceFormat that could work for you. Note, that hive integration won't work if you do this, but you can get your Json data over this way.

If you are wondering, the easiest way to plug this in is to build that connector's jar file and add the jar to the classpath. Then update the format.class config to point to the class. Please give that a go and let us know how it works out for future reference.

cotedm avatar Jan 17 '17 15:01 cotedm

I have the same problem with json format record. Like heifrank said: it works fine by using avro format and schema registry. But once I changed to json, it could write data in topic but couldn't be consumed from kafka-hdfs-sink connector... see error logs below

[2017-04-04 13:49:49,835] ERROR Failed to convert config data to Kafka Connect format: (org.apache.kafka.connect.storage.KafkaConfigBackingStore:440) org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error: at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:305) at org.apache.kafka.connect.storage.KafkaConfigBackingStore$ConsumeCallback.onCompletion(KafkaConfigBackingStore.java:438) at org.apache.kafka.connect.storage.KafkaConfigBackingStore$ConsumeCallback.onCompletion(KafkaConfigBackingStore.java:427) at org.apache.kafka.connect.util.KafkaBasedLog.poll(KafkaBasedLog.java:251) at org.apache.kafka.connect.util.KafkaBasedLog.readToLogEnd(KafkaBasedLog.java:274) at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:141) at org.apache.kafka.connect.storage.KafkaConfigBackingStore.start(KafkaConfigBackingStore.java:246) at org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:95) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:195) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'Struct': was expecting ('true', 'false' or 'null') at [Source: [B@64136ffe; line: 1, column: 8] Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'Struct': was expecting ('true', 'false' or 'null') at [Source: [B@64136ffe; line: 1, column: 8] at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1487) at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:518) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3323) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2482) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:801) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:697) at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3604) at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3549) at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2161) at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:50) at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:303) at org.apache.kafka.connect.storage.KafkaConfigBackingStore$ConsumeCallback.onCompletion(KafkaConfigBackingStore.java:438) at org.apache.kafka.connect.storage.KafkaConfigBackingStore$ConsumeCallback.onCompletion(KafkaConfigBackingStore.java:427) at org.apache.kafka.connect.util.KafkaBasedLog.poll(KafkaBasedLog.java:251) at org.apache.kafka.connect.util.KafkaBasedLog.readToLogEnd(KafkaBasedLog.java:274) at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:141) at org.apache.kafka.connect.storage.KafkaConfigBackingStore.start(KafkaConfigBackingStore.java:246) at org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:95) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:195) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) [2017-04-04 13:49:49,842] ERROR Failed to convert config data to Kafka Connect format: (org.apache.kafka.connect.storage.KafkaConfigBackingStore:440) org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error: at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:305) at org.apache.kafka.connect.storage.KafkaConfigBackingStore$ConsumeCallback.onCompletion(KafkaConfigBackingStore.java:438) at org.apache.kafka.connect.storage.KafkaConfigBackingStore$ConsumeCallback.onCompletion(KafkaConfigBackingStore.java:427) at org.apache.kafka.connect.util.KafkaBasedLog.poll(KafkaBasedLog.java:251) at org.apache.kafka.connect.util.KafkaBasedLog.readToLogEnd(KafkaBasedLog.java:274) at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:141) at org.apache.kafka.connect.storage.KafkaConfigBackingStore.start(KafkaConfigBackingStore.java:246) at org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:95) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:195) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'Struct': was expecting ('true', 'false' or 'null') at [Source: [B@6e628bd1; line: 1, column: 8] Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'Struct': was expecting ('true', 'false' or 'null') at [Source: [B@6e628bd1; line: 1, column: 8] at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1487) at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:518) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3323) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2482) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:801) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:697) at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3604) at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3549) at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2161) at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:50) at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:303) at org.apache.kafka.connect.storage.KafkaConfigBackingStore$ConsumeCallback.onCompletion(KafkaConfigBackingStore.java:438) at org.apache.kafka.connect.storage.KafkaConfigBackingStore$ConsumeCallback.onCompletion(KafkaConfigBackingStore.java:427) at org.apache.kafka.connect.util.KafkaBasedLog.poll(KafkaBasedLog.java:251) at org.apache.kafka.connect.util.KafkaBasedLog.readToLogEnd(KafkaBasedLog.java:274) at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:141) at org.apache.kafka.connect.storage.KafkaConfigBackingStore.start(KafkaConfigBackingStore.java:246) at org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:95) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:195) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) [2017-04-04 13:49:49,843] INFO Removed connector json-elasticsearch-sink due to null configuration. This is usually intentional and does not indicate an issue. (org.apache.kafka.connect.storage.KafkaConfigBackingStore:495)

xiabai84 avatar Apr 04 '17 20:04 xiabai84

@xiabai84 Your problem seems unrelated. It looks like you have data in the Kafka Connect config topic that is in a different format than expected. This could happen if you had multiple Kafka Connect clusters trying to write to the same topic (with different internal converter configs), or a misconfigured worker. I'd suggest opening a new issue as this is unrelated to the original issue in this thread.

ewencp avatar Apr 06 '17 03:04 ewencp

@heifrank Note that flush.size is per topic partition not per topic. This can be important if you had 16k messages in your topic, but more than 16 partitions since you may never commit files in that case. If you were hitting the point of committing files, we would expect to see messages like

Starting commit and rotation for topic partition ...

in the log. The lack of these messages suggests you're not hitting any of the conditions that triggers rotating files.

If you can't get any of these to trigger, you could also try the rotate.interval.ms setting, which will force rotation of files after a certain interval of wall-clock time. This gives you an easy way to at least ensure data is committed every N seconds, and should be useful for diagnosing whether the tasks are stuck or just not satisfying the other conditions for rotating files.

ewencp avatar Apr 06 '17 04:04 ewencp

I have the same problem as @heifrank . I am using the latest confluentinc/cp-kafka-connect docker container. Connector job seems to stuck with [2017-05-25 10:53:50,533] INFO Reflections took 37787 ms to scan 562 urls, producing 13659 keys and 89522 values (org.reflections.Reflections:229). My Configs: name=kafka-hdfs connector.class=io.confluent.connect.hdfs.HdfsSinkConnector task.max=1 topics=topic4 hdfs.url=hdfs://localhost:9000 #format.class=io.confluent.connect.hdfs.parquet.ParquetFormat flush.size=50 rotate.interval.ms=10000

bootstrap.servers=localhost2:9092 key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=false value.converter.schemas.enable=false internal.key.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter.schemas.enable=false internal.value.converter.schemas.enable=false offset.storage.file.filename=/tmp/connect.offsets offset.flush.interval.ms=10000

As you see I also tried with ParquetFormat the result is the same. @ewencp I also set the rotate.interval.ms but nothing happens. The directories in the Hdfs were created, but they are empty.

baluchicken avatar May 25 '17 11:05 baluchicken

Still facing same problem as @heifrank and @baluchicken. @ewencp , any work around or suggestion would be appreciated?

hdfs config

name=hdfs-sink
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=1
topics=KAFKA_SPARK_FANOUT_SRP_TEST_JSON
hdfs.url=hdfs://localhost:9000
topics.dir=/home/hadoop/kafka/data
logs.dir=/home/hadoop/kafka/wal
format.class=io.confluent.connect.hdfs.parquet.ParquetFormat
partitioner.class=io.confluent.connect.hdfs.partitioner.FieldPartitioner
partition.field.name=PRT_ID
flush.size=100
rotate.interval.ms=10000

connector config

bootstrap.servers=localhost:9092

key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schema.registry.url=http://localhost:8081

key.converter.schemas.enable=false
value.converter.schemas.enable=false

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

schemas.enable=false

My schema with payload

{
	"schema":{
		  "type": "struct",
		  "version":1,
		  "name" : "KAFKA_SPARK_FANOUT_SRP_TEST_JSON",
		  "fields" : [
		    {"type":"int32","optional":true,"field":"PRT_ID"},
		    {"type":"string","optional":true,"field":"CRTD_BY"},
		    {"type":"int64","optional":true,"field":"CRTD_DT","name": "org.apache.kafka.connect.data.Timestamp", "version": 1},
		    {"type":"bytes","optional":true,"field":"RESIDUAL","name": "org.apache.kafka.connect.data.Decimal", "version": 1}
		    ]
		},
	"payload":{
		"PRT_ID":-999,
		"LINE_ID":2568731,
		"CRTD_BY":"SUDHIR",
		"CRTD_DT":1498039957000,
		"RESIDUAL": -1234.567
		}
}

Problem statement

1. If I enable schema value.converter.schemas.enable=true throws Null pointer exception

[2017-10-26 12:44:10,521] ERROR Task hdfs-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:148)
java.lang.NullPointerException
	at org.apache.kafka.connect.json.JsonConverter$13.convert(JsonConverter.java:191)
	at org.apache.kafka.connect.json.JsonConverter.convertToConnect(JsonConverter.java:716)
	at org.apache.kafka.connect.json.JsonConverter.access$000(JsonConverter.java:52)
	at org.apache.kafka.connect.json.JsonConverter$12.convert(JsonConverter.java:176)
	at org.apache.kafka.connect.json.JsonConverter.convertToConnect(JsonConverter.java:712)
	at org.apache.kafka.connect.json.JsonConverter.jsonToConnect(JsonConverter.java:331)
	at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:320)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:407)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:250)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:180)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

2. If I disable schema value.converter.schemas.enable=false , it creates a directory like +tmp like

image

But no record is being consumed and it getting hung / stuck with following log,

[2017-10-26 13:00:17,133] INFO Started ServerConnector@144beac5{HTTP/1.1}{0.0.0.0:8084} (org.eclipse.jetty.server.ServerConnector:266)
[2017-10-26 13:00:17,135] INFO Started @22564ms (org.eclipse.jetty.server.Server:379)
[2017-10-26 13:00:17,137] INFO REST server listening at http://192.168.20.236:8084/, advertising URL http://192.168.20.236:8084/ (org.apache.kafka.connect.runtime.rest.RestServer:150)
[2017-10-26 13:00:17,138] INFO Kafka Connect started (org.apache.kafka.connect.runtime.Connect:55)
[2017-10-26 13:00:17,174] INFO ConnectorConfig values:
	connector.class = io.confluent.connect.hdfs.HdfsSinkConnector
	key.converter = null
	name = hdfs-sink
	tasks.max = 1
	transforms = null
	value.converter = null
 (org.apache.kafka.connect.runtime.ConnectorConfig:223)
[2017-10-26 13:00:17,175] INFO EnrichedConnectorConfig values:
	connector.class = io.confluent.connect.hdfs.HdfsSinkConnector
	key.converter = null
	name = hdfs-sink
	tasks.max = 1
	transforms = null
	value.converter = null
 (org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:223)
[2017-10-26 13:00:17,175] INFO Creating connector hdfs-sink of type io.confluent.connect.hdfs.HdfsSinkConnector (org.apache.kafka.connect.runtime.Worker:204)
[2017-10-26 13:00:17,177] INFO Instantiated connector hdfs-sink with version 3.3.0 of type class io.confluent.connect.hdfs.HdfsSinkConnector (org.apache.kafka.connect.runtime.Worker:207)
[2017-10-26 13:00:17,178] INFO HdfsSinkConnectorConfig values:
	connect.hdfs.keytab =
	connect.hdfs.principal =
	filename.offset.zero.pad.width = 10
	flush.size = 100
	format.class = io.confluent.connect.hdfs.parquet.ParquetFormat
	hadoop.conf.dir =
	hadoop.home =
	hdfs.authentication.kerberos = false
	hdfs.namenode.principal =
	hdfs.url = hdfs://localhost:9000
	hive.conf.dir =
	hive.database = default
	hive.home =
	hive.integration = false
	hive.metastore.uris =
	kerberos.ticket.renew.period.ms = 3600000
	locale =
	logs.dir = /home/hadoop/kafka/wal
	partition.duration.ms = -1
	partition.field.name = PRT_ID
	partitioner.class = io.confluent.connect.hdfs.partitioner.FieldPartitioner
	path.format =
	retry.backoff.ms = 5000
	rotate.interval.ms = 10000
	rotate.schedule.interval.ms = -1
	schema.cache.size = 1000
	schema.compatibility = NONE
	shutdown.timeout.ms = 3000
	storage.class = io.confluent.connect.hdfs.storage.HdfsStorage
	timezone =
	topics.dir = /home/hadoop/kafka/data
 (io.confluent.connect.hdfs.HdfsSinkConnectorConfig:223)
[2017-10-26 13:00:17,184] INFO Finished creating connector hdfs-sink (org.apache.kafka.connect.runtime.Worker:225)
[2017-10-26 13:00:17,189] INFO SinkConnectorConfig values:
	connector.class = io.confluent.connect.hdfs.HdfsSinkConnector
	key.converter = null
	name = hdfs-sink
	tasks.max = 1
	topics = [KAFKA_SPARK_FANOUT_SRP_TEST_JSON]
	transforms = null
	value.converter = null
 (org.apache.kafka.connect.runtime.SinkConnectorConfig:223)
[2017-10-26 13:00:17,189] INFO EnrichedConnectorConfig values:
	connector.class = io.confluent.connect.hdfs.HdfsSinkConnector
	key.converter = null
	name = hdfs-sink
	tasks.max = 1
	topics = [KAFKA_SPARK_FANOUT_SRP_TEST_JSON]
	transforms = null
	value.converter = null
 (org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:223)
[2017-10-26 13:00:17,192] INFO Creating task hdfs-sink-0 (org.apache.kafka.connect.runtime.Worker:358)
[2017-10-26 13:00:17,192] INFO ConnectorConfig values:
	connector.class = io.confluent.connect.hdfs.HdfsSinkConnector
	key.converter = null
	name = hdfs-sink
	tasks.max = 1
	transforms = null
	value.converter = null
 (org.apache.kafka.connect.runtime.ConnectorConfig:223)
[2017-10-26 13:00:17,193] INFO EnrichedConnectorConfig values:
	connector.class = io.confluent.connect.hdfs.HdfsSinkConnector
	key.converter = null
	name = hdfs-sink
	tasks.max = 1
	transforms = null
	value.converter = null
 (org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:223)
[2017-10-26 13:00:17,194] INFO TaskConfig values:
	task.class = class io.confluent.connect.hdfs.HdfsSinkTask
 (org.apache.kafka.connect.runtime.TaskConfig:223)
[2017-10-26 13:00:17,194] INFO Instantiated task hdfs-sink-0 with version 3.3.0 of type io.confluent.connect.hdfs.HdfsSinkTask (org.apache.kafka.connect.runtime.Worker:373)
[2017-10-26 13:00:17,214] INFO ConsumerConfig values:
	auto.commit.interval.ms = 5000
	auto.offset.reset = earliest
	bootstrap.servers = [localhost:9092]
	check.crcs = true
	client.id =
	connections.max.idle.ms = 540000
	enable.auto.commit = false
	exclude.internal.topics = true
	fetch.max.bytes = 52428800
	fetch.max.wait.ms = 500
	fetch.min.bytes = 1
	group.id = connect-hdfs-sink
	heartbeat.interval.ms = 3000
	interceptor.classes = null
	internal.leave.group.on.close = true
	isolation.level = read_uncommitted
	key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
	max.partition.fetch.bytes = 1048576
	max.poll.interval.ms = 300000
	max.poll.records = 500
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
	receive.buffer.bytes = 65536
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 305000
	retry.backoff.ms = 100
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	send.buffer.bytes = 131072
	session.timeout.ms = 10000
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
 (org.apache.kafka.clients.consumer.ConsumerConfig:223)
[2017-10-26 13:00:17,341] INFO Kafka version : 0.11.0.0-cp1 (org.apache.kafka.common.utils.AppInfoParser:83)
[2017-10-26 13:00:17,341] INFO Kafka commitId : 5cadaa94d0a69e0d (org.apache.kafka.common.utils.AppInfoParser:84)
[2017-10-26 13:00:17,351] INFO HdfsSinkConnectorConfig values:
	connect.hdfs.keytab =
	connect.hdfs.principal =
	filename.offset.zero.pad.width = 10
	flush.size = 100
	format.class = io.confluent.connect.hdfs.parquet.ParquetFormat
	hadoop.conf.dir =
	hadoop.home =
	hdfs.authentication.kerberos = false
	hdfs.namenode.principal =
	hdfs.url = hdfs://localhost:9000
	hive.conf.dir =
	hive.database = default
	hive.home =
	hive.integration = false
	hive.metastore.uris =
	kerberos.ticket.renew.period.ms = 3600000
	locale =
	logs.dir = /home/hadoop/kafka/wal
	partition.duration.ms = -1
	partition.field.name = PRT_ID
	partitioner.class = io.confluent.connect.hdfs.partitioner.FieldPartitioner
	path.format =
	retry.backoff.ms = 5000
	rotate.interval.ms = 10000
	rotate.schedule.interval.ms = -1
	schema.cache.size = 1000
	schema.compatibility = NONE
	shutdown.timeout.ms = 3000
	storage.class = io.confluent.connect.hdfs.storage.HdfsStorage
	timezone =
	topics.dir = /home/hadoop/kafka/data
 (io.confluent.connect.hdfs.HdfsSinkConnectorConfig:223)
[2017-10-26 13:00:17,371] INFO Created connector hdfs-sink (org.apache.kafka.connect.cli.ConnectStandalone:91)
[2017-10-26 13:00:17,702] INFO AvroDataConfig values:
	schemas.cache.config = 1000
	enhanced.avro.schema.support = false
	connect.meta.data = true
 (io.confluent.connect.avro.AvroDataConfig:170)
[2017-10-26 13:00:17,707] INFO Hadoop configuration directory  (io.confluent.connect.hdfs.DataWriter:93)
[2017-10-26 13:00:18,512] WARN Unable to load native-hadoop library for your platform... using builtin-java classes where applicable (org.apache.hadoop.util.NativeCodeLoader:62)
[2017-10-26 13:00:19,685] INFO Sink task WorkerSinkTask{id=hdfs-sink-0} finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:233)
[2017-10-26 13:00:19,857] INFO Discovered coordinator localhost:9092 (id: 2147483647 rack: null) for group connect-hdfs-sink. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:597)
[2017-10-26 13:00:19,861] INFO Revoking previously assigned partitions [] for group connect-hdfs-sink (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:419)
[2017-10-26 13:00:19,861] INFO (Re-)joining group connect-hdfs-sink (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:432)
[2017-10-26 13:00:19,886] INFO Successfully joined group connect-hdfs-sink with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:399)
[2017-10-26 13:00:19,892] INFO Setting newly assigned partitions [KAFKA_SPARK_FANOUT_SRP_TEST_JSON-0] for group connect-hdfs-sink (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:262)
[2017-10-26 13:00:19,958] INFO Started recovery for topic partition KAFKA_SPARK_FANOUT_SRP_TEST_JSON-0 (io.confluent.connect.hdfs.TopicPartitionWriter:208)
[2017-10-26 13:00:19,970] INFO Finished recovery for topic partition KAFKA_SPARK_FANOUT_SRP_TEST_JSON-0 (io.confluent.connect.hdfs.TopicPartitionWriter:223)

sudhirsrepo avatar Oct 27 '17 00:10 sudhirsrepo

Same problem exists for me: Log Error: org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: java.lang.StackOverflowError\n\tat org.apache.parquet.avro.AvroSchemaConverter.convertUnion(AvroSchemaConverter.java:214)\n\tat org.apache.parquet.avro.AvroSchemaConverter.convertField(AvroSchemaConverter.java:171)\n\tat org.apache.parquet.avro.AvroSchemaConverter.convertField(AvroSchemaConverter.java:130)\n\tat org.apache.parquet.avro.AvroSchemaConverter.convertField(AvroSchemaConverter.java:227)\n\tat org.apache.parquet.avro.AvroSchemaConverter.convertFields(AvroSchemaConverter.java:124)\n\tat org.apache.parquet.avro.AvroSchemaConverter.convertField(AvroSchemaConverter.java:152)\n\tat org.apache.parquet.avro.AvroSchemaConverter.convertField(AvroSchemaConverter.java:157)\n\tat org.apache.parquet.avro.AvroSchemaConverter.convertUnion(AvroSchemaConverter.java:214)\n\tat org.apache.parquet.avro.AvroSchemaConverter.convertField(AvroSchemaConverter.java:171)\n\tat org.apache.parquet.avro.AvroSchemaConverter.convertField(AvroSchemaConverter.java:130)\n\tat org.apache.parquet.avro.AvroSchemaConverter.convertField(AvroSchemaConverter.java:227)

madhumaran avatar Sep 22 '18 22:09 madhumaran

@gwenshap , @ewencp i am also having the same issue. Unable to figure out the exact error and fix the issue. Kindly help in resolving. my hdfs-sink configuration { "name":"hdfs-sink", "config":{ "connector.class":"io.confluent.connect.hdfs.HdfsSinkConnector", "tasks.max":"1", "topics":"mysql-prod-registrations", "hadoop.conf.dir":"/usr/hdp/current/hadoop-client/conf", "hadoop.home":"/usr/hdp/current/hadoop-client/", "hdfs.url":"hdfs://HACluster:8020", "topics.dir":"/topics", "logs.dir":"/logs", "flush.size":"100", "rotate.interval.ms":"60000", "format.class":"io.confluent.connect.hdfs.avro.AvroFormat", "value.converter.schemas.enable": "false", "partitioner.class":"io.confluent.connect.hdfs.partitioner.TimeBasedPartitioner", "partition.duration.ms":"1800000", "path.format":"'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH/", "locale":"en", "timezone":"Asia/Kolkata" } }

ERROR: { "name": "hdfs-sink", "connector": { "state": "RUNNING", "worker_id": "xxxxxxx:8083" }, "tasks": [ { "state": "FAILED", "trace": "java.lang.NullPointerException\n\tat io.confluent.connect.hdfs.HdfsSinkTask.open(HdfsSinkTask.java:133)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.openPartitions(WorkerSinkTask.java:612)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.access$1100(WorkerSinkTask.java:69)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsAssigned(WorkerSinkTask.java:672)\n\tat org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:283)\n\tat org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:422)\n\tat org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:352)\n\tat org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:337)\n\tat org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:343)\n\tat org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1218)\n\tat org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1181)\n\tat org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:444)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:317)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\tat java.lang.Thread.run(Thread.java:745)\n", "id": 0, "worker_id": "xxxxxxx:8083" } ], "type": "sink" }

bkotesh avatar Dec 20 '18 06:12 bkotesh

Same problem here, all schemas disabled in the Kafka HDFS connector, but receiving error message:

JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.

mcapitanio avatar Jan 29 '19 11:01 mcapitanio