docker-kafka-connect icon indicating copy to clipboard operation
docker-kafka-connect copied to clipboard

Failed to find leader for Set([names],0)

Open mcandre opened this issue 8 years ago • 0 comments

Summary

When the 1ambda/kafka-connect stack tries to create a kafka topic "names", no broker becomes the leader.

Source

https://github.com/mcandre/hello-kafka-connect

Trace

$ gradle clean shadowJar
$ docker-compose build --no-cache
$ docker-compose rm -f && docker-compose up --force-recreate
...

$ kafka-topics --zookeeper $(docker-machine ip default):2181/broker-0 --list
__consumer_offsets
connect-configs
connect-offsets
connect-status

$ curl -XPOST $(docker-machine ip default):8083/connectors \
       -H "Content-Type: application/json" \
       -d "{
             \"name\": \"name-source\",
             \"config\": {
               \"connector.class\": \"us.yellosoft.hellokafkaconnect.NameSource\",
               \"tasks.max\": \"1\",
               \"topics\": \"names\",
               \"kafka_partitions\": \"1\",
               \"redis_address\": \"redis://$(docker-machine ip default):6379\",
               \"name_list_key\": \"names\"
              }
           }" | jq .

{
  "name": "name-source",
  "config": {
    "connector.class": "us.yellosoft.hellokafkaconnect.NameSource",
    "tasks.max": "1",
    "topics": "names",
    "kafka_partitions": "1",
    "redis_address": "redis://192.168.99.101:6379",
    "name_list_key": "names",
    "name": "name-source"
  },
  "tasks": []
}

$ redis-cli -h $(docker-machine ip default) lpush names 'Alice'
(integer) 1

$ kafka-topics --zookeeper $(docker-machine ip default):2181/broker-0 --list
__consumer_offsets
connect-configs
connect-offsets
connect-status
names

$ kafka-console-consumer --zookeeper $(docker-machine ip dev):2181/broker-0 --topic names
[2016-07-26 10:37:40,200] WARN Fetching topic metadata with correlation id 0 for topics [Set(names)] from broker [BrokerEndPoint(0,kafka,9092)] failed (kafka.client.ClientUtils$)
java.nio.channels.ClosedChannelException
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
    at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
    at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:74)
    at kafka.producer.SyncProducer.send(SyncProducer.scala:119)
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)
    at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
[2016-07-26 10:37:40,203] WARN [console-consumer-58015_andrew-pennebaker-1469547459888-b7033e4d-leader-finder-thread], Failed to find leader for Set([names,0]) (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread)
kafka.common.KafkaException: fetching topic metadata for topics [Set(names)] from broker [ArrayBuffer(BrokerEndPoint(0,kafka,9092))] failed
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:73)
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)
    at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
Caused by: java.nio.channels.ClosedChannelException
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
    at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
    at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:74)
    at kafka.producer.SyncProducer.send(SyncProducer.scala:119)
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
    ... 3 more

I tried waiting a bit for the broker to become the leader, but it seems to never become the leader.

I also tried using the --newconsumer --bootstrap-server ... option, but this hangs:

$ kafka-console-consumer --zookeeper $(docker-machine ip default):2181/broker-0 --new-consumer --bootstrap-server $(docker-machine ip default):9092 --topic names
(silently hangs)

This issue exhibits the same docker-compose error logs as #3 :

$ docker-compose rm -f && docker-compose up --force-recreate
...
connect_1  | [2016-07-26 15:48:50,089] ERROR Unhandled exception when committing WorkerSourceTask{id=name-source-0}:  (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter:115)
connect_1  | java.lang.NullPointerException
connect_1  |    at org.apache.kafka.connect.storage.KafkaOffsetBackingStore.set(KafkaOffsetBackingStore.java:122)
connect_1  |    at org.apache.kafka.connect.storage.OffsetStorageWriter.doFlush(OffsetStorageWriter.java:165)
connect_1  |    at org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:319)
connect_1  |    at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.commit(SourceTaskOffsetCommitter.java:107)
connect_1  |    at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.access$000(SourceTaskOffsetCommitter.java:44)
connect_1  |    at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter$1.run(SourceTaskOffsetCommitter.java:73)
connect_1  |    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
connect_1  |    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
connect_1  |    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
connect_1  |    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
connect_1  |    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
connect_1  |    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
connect_1  |    at java.lang.Thread.run(Thread.java:745)
connect_1  | [2016-07-26 15:48:52,423] INFO 192.168.99.1 - - [26/Jul/2016:15:48:52 +0000] "GET /connectors HTTP/1.1" 200 15  11 (org.apache.kafka.connect.runtime.rest.RestServer:60)
connect_1  | [2016-07-26 15:49:00,098] ERROR Invalid call to OffsetStorageWriter flush() while already flushing, the framework should not allow this (org.apache.kafka.connect.storage.OffsetStorageWriter:110)
connect_1  | [2016-07-26 15:49:00,099] ERROR Unhandled exception when committing WorkerSourceTask{id=name-source-0}:  (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter:115)
connect_1  | org.apache.kafka.connect.errors.ConnectException: OffsetStorageWriter is already flushing
connect_1  |    at org.apache.kafka.connect.storage.OffsetStorageWriter.beginFlush(OffsetStorageWriter.java:112)
connect_1  |    at org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:279)
connect_1  |    at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.commit(SourceTaskOffsetCommitter.java:107)
connect_1  |    at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.access$000(SourceTaskOffsetCommitter.java:44)
connect_1  |    at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter$1.run(SourceTaskOffsetCommitter.java:73)
connect_1  |    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
connect_1  |    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
connect_1  |    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
connect_1  |    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
connect_1  |    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
connect_1  |    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
connect_1  |    at java.lang.Thread.run(Thread.java:745)
zk_1       | 2016-07-26 15:49:05,911 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@192] - Accepted socket connection from /192.168.99.1:52065
zk_1       | 2016-07-26 15:49:05,916 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@900] - Client attempting to establish new session at /192.168.99.1:52065
zk_1       | 2016-07-26 15:49:05,918 [myid:] - INFO  [SyncThread:0:ZooKeeperServer@645] - Established session 0x15627e3e9780006 with negotiated timeout 30000 for client /192.168.99.1:52065
zk_1       | 2016-07-26 15:49:05,939 [myid:] - INFO  [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@489] - Processed session termination for sessionid: 0x15627e3e9780006
zk_1       | 2016-07-26 15:49:05,942 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1008] - Closed socket connection for client /192.168.99.1:52065 which had sessionid 0x15627e3e9780006
connect_1  | [2016-07-26 15:49:10,102] ERROR Invalid call to OffsetStorageWriter flush() while already flushing, the framework should not allow this (org.apache.kafka.connect.storage.OffsetStorageWriter:110)
connect_1  | [2016-07-26 15:49:10,102] ERROR Unhandled exception when committing WorkerSourceTask{id=name-source-0}:  (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter:115)
connect_1  | org.apache.kafka.connect.errors.ConnectException: OffsetStorageWriter is already flushing
connect_1  |    at org.apache.kafka.connect.storage.OffsetStorageWriter.beginFlush(OffsetStorageWriter.java:112)
connect_1  |    at org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:279)
connect_1  |    at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.commit(SourceTaskOffsetCommitter.java:107)
connect_1  |    at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.access$000(SourceTaskOffsetCommitter.java:44)
connect_1  |    at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter$1.run(SourceTaskOffsetCommitter.java:73)
connect_1  |    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
connect_1  |    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
connect_1  |    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
connect_1  |    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
connect_1  |    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
connect_1  |    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
connect_1  |    at java.lang.Thread.run(Thread.java:745)
connect_1  | [2016-07-26 15:49:20,104] ERROR Invalid call to OffsetStorageWriter flush() while already flushing, the framework should not allow this (org.apache.kafka.connect.storage.OffsetStorageWriter:110)
connect_1  | [2016-07-26 15:49:20,104] ERROR Unhandled exception when committing WorkerSourceTask{id=name-source-0}:  (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter:115)
connect_1  | org.apache.kafka.connect.errors.ConnectException: OffsetStorageWriter is already flushing
connect_1  |    at org.apache.kafka.connect.storage.OffsetStorageWriter.beginFlush(OffsetStorageWriter.java:112)
connect_1  |    at org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:279)
connect_1  |    at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.commit(SourceTaskOffsetCommitter.java:107)
connect_1  |    at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.access$000(SourceTaskOffsetCommitter.java:44)
connect_1  |    at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter$1.run(SourceTaskOffsetCommitter.java:73)
connect_1  |    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
connect_1  |    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
connect_1  |    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
connect_1  |    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
connect_1  |    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
connect_1  |    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
connect_1  |    at java.lang.Thread.run(Thread.java:745)
connect_1  | [2016-07-26 15:49:30,106] ERROR Invalid call to OffsetStorageWriter flush() while already flushing, the framework should not allow this (org.apache.kafka.connect.storage.OffsetStorageWriter:110)
connect_1  | [2016-07-26 15:49:30,106] ERROR Unhandled exception when committing WorkerSourceTask{id=name-source-0}:  (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter:115)
connect_1  | org.apache.kafka.connect.errors.ConnectException: OffsetStorageWriter is already flushing
connect_1  |    at org.apache.kafka.connect.storage.OffsetStorageWriter.beginFlush(OffsetStorageWriter.java:112)
connect_1  |    at org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:279)
connect_1  |    at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.commit(SourceTaskOffsetCommitter.java:107)
connect_1  |    at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.access$000(SourceTaskOffsetCommitter.java:44)
connect_1  |    at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter$1.run(SourceTaskOffsetCommitter.java:73)
connect_1  |    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
connect_1  |    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
connect_1  |    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
connect_1  |    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
connect_1  |    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
connect_1  |    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
connect_1  |    at java.lang.Thread.run(Thread.java:745)
^CGracefully stopping... (press Ctrl+C again to force)
Stopping hellokafkaconnect_connect_1 ... done
Stopping hellokafkaconnect_kafka_1 ... done
Stopping hellokafkaconnect_redis_1 ... done
Stopping hellokafkaconnect_zk_1 ... done

mcandre avatar Jul 26 '16 15:07 mcandre