docker-kafka-connect
docker-kafka-connect copied to clipboard
Failed to find leader for Set([names],0)
Summary
When the 1ambda/kafka-connect stack tries to create a kafka topic "names", no broker becomes the leader.
Source
https://github.com/mcandre/hello-kafka-connect
Trace
$ gradle clean shadowJar
$ docker-compose build --no-cache
$ docker-compose rm -f && docker-compose up --force-recreate
...
$ kafka-topics --zookeeper $(docker-machine ip default):2181/broker-0 --list
__consumer_offsets
connect-configs
connect-offsets
connect-status
$ curl -XPOST $(docker-machine ip default):8083/connectors \
-H "Content-Type: application/json" \
-d "{
\"name\": \"name-source\",
\"config\": {
\"connector.class\": \"us.yellosoft.hellokafkaconnect.NameSource\",
\"tasks.max\": \"1\",
\"topics\": \"names\",
\"kafka_partitions\": \"1\",
\"redis_address\": \"redis://$(docker-machine ip default):6379\",
\"name_list_key\": \"names\"
}
}" | jq .
{
"name": "name-source",
"config": {
"connector.class": "us.yellosoft.hellokafkaconnect.NameSource",
"tasks.max": "1",
"topics": "names",
"kafka_partitions": "1",
"redis_address": "redis://192.168.99.101:6379",
"name_list_key": "names",
"name": "name-source"
},
"tasks": []
}
$ redis-cli -h $(docker-machine ip default) lpush names 'Alice'
(integer) 1
$ kafka-topics --zookeeper $(docker-machine ip default):2181/broker-0 --list
__consumer_offsets
connect-configs
connect-offsets
connect-status
names
$ kafka-console-consumer --zookeeper $(docker-machine ip dev):2181/broker-0 --topic names
[2016-07-26 10:37:40,200] WARN Fetching topic metadata with correlation id 0 for topics [Set(names)] from broker [BrokerEndPoint(0,kafka,9092)] failed (kafka.client.ClientUtils$)
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:74)
at kafka.producer.SyncProducer.send(SyncProducer.scala:119)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)
at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
[2016-07-26 10:37:40,203] WARN [console-consumer-58015_andrew-pennebaker-1469547459888-b7033e4d-leader-finder-thread], Failed to find leader for Set([names,0]) (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread)
kafka.common.KafkaException: fetching topic metadata for topics [Set(names)] from broker [ArrayBuffer(BrokerEndPoint(0,kafka,9092))] failed
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:73)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)
at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
Caused by: java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:74)
at kafka.producer.SyncProducer.send(SyncProducer.scala:119)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
... 3 more
I tried waiting a bit for the broker to become the leader, but it seems to never become the leader.
I also tried using the --newconsumer --bootstrap-server ...
option, but this hangs:
$ kafka-console-consumer --zookeeper $(docker-machine ip default):2181/broker-0 --new-consumer --bootstrap-server $(docker-machine ip default):9092 --topic names
(silently hangs)
This issue exhibits the same docker-compose error logs as #3 :
$ docker-compose rm -f && docker-compose up --force-recreate
...
connect_1 | [2016-07-26 15:48:50,089] ERROR Unhandled exception when committing WorkerSourceTask{id=name-source-0}: (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter:115)
connect_1 | java.lang.NullPointerException
connect_1 | at org.apache.kafka.connect.storage.KafkaOffsetBackingStore.set(KafkaOffsetBackingStore.java:122)
connect_1 | at org.apache.kafka.connect.storage.OffsetStorageWriter.doFlush(OffsetStorageWriter.java:165)
connect_1 | at org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:319)
connect_1 | at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.commit(SourceTaskOffsetCommitter.java:107)
connect_1 | at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.access$000(SourceTaskOffsetCommitter.java:44)
connect_1 | at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter$1.run(SourceTaskOffsetCommitter.java:73)
connect_1 | at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
connect_1 | at java.util.concurrent.FutureTask.run(FutureTask.java:266)
connect_1 | at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
connect_1 | at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
connect_1 | at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
connect_1 | at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
connect_1 | at java.lang.Thread.run(Thread.java:745)
connect_1 | [2016-07-26 15:48:52,423] INFO 192.168.99.1 - - [26/Jul/2016:15:48:52 +0000] "GET /connectors HTTP/1.1" 200 15 11 (org.apache.kafka.connect.runtime.rest.RestServer:60)
connect_1 | [2016-07-26 15:49:00,098] ERROR Invalid call to OffsetStorageWriter flush() while already flushing, the framework should not allow this (org.apache.kafka.connect.storage.OffsetStorageWriter:110)
connect_1 | [2016-07-26 15:49:00,099] ERROR Unhandled exception when committing WorkerSourceTask{id=name-source-0}: (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter:115)
connect_1 | org.apache.kafka.connect.errors.ConnectException: OffsetStorageWriter is already flushing
connect_1 | at org.apache.kafka.connect.storage.OffsetStorageWriter.beginFlush(OffsetStorageWriter.java:112)
connect_1 | at org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:279)
connect_1 | at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.commit(SourceTaskOffsetCommitter.java:107)
connect_1 | at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.access$000(SourceTaskOffsetCommitter.java:44)
connect_1 | at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter$1.run(SourceTaskOffsetCommitter.java:73)
connect_1 | at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
connect_1 | at java.util.concurrent.FutureTask.run(FutureTask.java:266)
connect_1 | at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
connect_1 | at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
connect_1 | at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
connect_1 | at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
connect_1 | at java.lang.Thread.run(Thread.java:745)
zk_1 | 2016-07-26 15:49:05,911 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@192] - Accepted socket connection from /192.168.99.1:52065
zk_1 | 2016-07-26 15:49:05,916 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@900] - Client attempting to establish new session at /192.168.99.1:52065
zk_1 | 2016-07-26 15:49:05,918 [myid:] - INFO [SyncThread:0:ZooKeeperServer@645] - Established session 0x15627e3e9780006 with negotiated timeout 30000 for client /192.168.99.1:52065
zk_1 | 2016-07-26 15:49:05,939 [myid:] - INFO [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@489] - Processed session termination for sessionid: 0x15627e3e9780006
zk_1 | 2016-07-26 15:49:05,942 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1008] - Closed socket connection for client /192.168.99.1:52065 which had sessionid 0x15627e3e9780006
connect_1 | [2016-07-26 15:49:10,102] ERROR Invalid call to OffsetStorageWriter flush() while already flushing, the framework should not allow this (org.apache.kafka.connect.storage.OffsetStorageWriter:110)
connect_1 | [2016-07-26 15:49:10,102] ERROR Unhandled exception when committing WorkerSourceTask{id=name-source-0}: (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter:115)
connect_1 | org.apache.kafka.connect.errors.ConnectException: OffsetStorageWriter is already flushing
connect_1 | at org.apache.kafka.connect.storage.OffsetStorageWriter.beginFlush(OffsetStorageWriter.java:112)
connect_1 | at org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:279)
connect_1 | at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.commit(SourceTaskOffsetCommitter.java:107)
connect_1 | at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.access$000(SourceTaskOffsetCommitter.java:44)
connect_1 | at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter$1.run(SourceTaskOffsetCommitter.java:73)
connect_1 | at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
connect_1 | at java.util.concurrent.FutureTask.run(FutureTask.java:266)
connect_1 | at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
connect_1 | at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
connect_1 | at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
connect_1 | at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
connect_1 | at java.lang.Thread.run(Thread.java:745)
connect_1 | [2016-07-26 15:49:20,104] ERROR Invalid call to OffsetStorageWriter flush() while already flushing, the framework should not allow this (org.apache.kafka.connect.storage.OffsetStorageWriter:110)
connect_1 | [2016-07-26 15:49:20,104] ERROR Unhandled exception when committing WorkerSourceTask{id=name-source-0}: (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter:115)
connect_1 | org.apache.kafka.connect.errors.ConnectException: OffsetStorageWriter is already flushing
connect_1 | at org.apache.kafka.connect.storage.OffsetStorageWriter.beginFlush(OffsetStorageWriter.java:112)
connect_1 | at org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:279)
connect_1 | at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.commit(SourceTaskOffsetCommitter.java:107)
connect_1 | at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.access$000(SourceTaskOffsetCommitter.java:44)
connect_1 | at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter$1.run(SourceTaskOffsetCommitter.java:73)
connect_1 | at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
connect_1 | at java.util.concurrent.FutureTask.run(FutureTask.java:266)
connect_1 | at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
connect_1 | at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
connect_1 | at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
connect_1 | at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
connect_1 | at java.lang.Thread.run(Thread.java:745)
connect_1 | [2016-07-26 15:49:30,106] ERROR Invalid call to OffsetStorageWriter flush() while already flushing, the framework should not allow this (org.apache.kafka.connect.storage.OffsetStorageWriter:110)
connect_1 | [2016-07-26 15:49:30,106] ERROR Unhandled exception when committing WorkerSourceTask{id=name-source-0}: (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter:115)
connect_1 | org.apache.kafka.connect.errors.ConnectException: OffsetStorageWriter is already flushing
connect_1 | at org.apache.kafka.connect.storage.OffsetStorageWriter.beginFlush(OffsetStorageWriter.java:112)
connect_1 | at org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:279)
connect_1 | at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.commit(SourceTaskOffsetCommitter.java:107)
connect_1 | at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.access$000(SourceTaskOffsetCommitter.java:44)
connect_1 | at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter$1.run(SourceTaskOffsetCommitter.java:73)
connect_1 | at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
connect_1 | at java.util.concurrent.FutureTask.run(FutureTask.java:266)
connect_1 | at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
connect_1 | at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
connect_1 | at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
connect_1 | at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
connect_1 | at java.lang.Thread.run(Thread.java:745)
^CGracefully stopping... (press Ctrl+C again to force)
Stopping hellokafkaconnect_connect_1 ... done
Stopping hellokafkaconnect_kafka_1 ... done
Stopping hellokafkaconnect_redis_1 ... done
Stopping hellokafkaconnect_zk_1 ... done