docker-kafka-connect
docker-kafka-connect copied to clipboard
Auto topic creation issues
Summary
With default configuration, the 1ambda/kafka-connect stack fails the first Source send, because the auto topic creation fails to do all the work necessary to establish the topic.
Source
https://github.com/mcandre/hello-kafka-connect
Trace
$ gradle clean shadowJar
$ docker-compose build --no-cache
$ docker-compose rm -f && docker-compose up --force-recreate
...
$ curl -XPOST $(docker-machine ip default):8083/connectors \
-H "Content-Type: application/json" \
-d "{
\"name\": \"name-source\",
\"config\": {
\"connector.class\": \"us.yellosoft.hellokafkaconnect.NameSource\",
\"tasks.max\": \"1\",
\"topics\": \"names\",
\"kafka_partitions\": \"1\",
\"redis_address\": \"redis://$(docker-machine ip default):6379\",
\"name_list_key\": \"names\"
}
}" | jq .
{
"name": "name-source",
"config": {
"connector.class": "us.yellosoft.hellokafkaconnect.NameSource",
"tasks.max": "1",
"topics": "names",
"kafka_partitions": "1",
"redis_address": "redis://192.168.99.101:6379",
"name_list_key": "names",
"name": "name-source"
},
"tasks": []
}
$ curl $(docker-machine ip default):8083/connectors | jq .
[
"name-source"
]
$ redis-cli -h $(docker-machine ip default) lpush names 'Alice'
(integer) 1
$ redis-cli -h $(docker-machine ip default) llen names
(integer) 1
$ redis-cli -h $(docker-machine ip default) llen names
(integer) 0
$ kafka-topics --zookeeper $(docker-machine ip default):2181/broker-0 --list
__consumer_offsets
connect-configs
connect-offsets
connect-status
names
(back in the docker-compose logs)
...
zk_1 | 2016-07-26 16:44:46,747 [myid:] - INFO [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@651] - Got user-level KeeperException when processing sessionid:0x1562815410b0005 type:setData cxid:0x1b6 zxid:0xd6 txntype:-1 reqpath:n/a Error Path:/broker-0/config/topics/names Error:KeeperErrorCode = NoNode for /broker-0/config/topics/names
zk_1 | 2016-07-26 16:44:46,749 [myid:] - INFO [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@651] - Got user-level KeeperException when processing sessionid:0x1562815410b0005 type:create cxid:0x1b7 zxid:0xd7 txntype:-1 reqpath:n/a Error Path:/broker-0/config/topics Error:KeeperErrorCode = NodeExists for /broker-0/config/topics
kafka_1 | [2016-07-26 16:44:46,753] INFO Topic creation {"version":1,"partitions":{"0":[0]}} (kafka.admin.AdminUtils$)
zk_1 | 2016-07-26 16:44:46,761 [myid:] - INFO [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@651] - Got user-level KeeperException when processing sessionid:0x1562815410b0005 type:create cxid:0x1bf zxid:0xda txntype:-1 reqpath:n/a Error Path:/broker-0/brokers/topics/names/partitions/0 Error:KeeperErrorCode = NoNode for /broker-0/brokers/topics/names/partitions/0
kafka_1 | [2016-07-26 16:44:46,762] INFO [KafkaApi-0] Auto creation of topic names with 1 partitions and replication factor 1 is successful (kafka.server.KafkaApis)
zk_1 | 2016-07-26 16:44:46,762 [myid:] - INFO [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@651] - Got user-level KeeperException when processing sessionid:0x1562815410b0005 type:create cxid:0x1c0 zxid:0xdb txntype:-1 reqpath:n/a Error Path:/broker-0/brokers/topics/names/partitions Error:KeeperErrorCode = NoNode for /broker-0/brokers/topics/names/partitions
connect_1 | [2016-07-26 16:44:46,770] WARN Error while fetching metadata with correlation id 0 : {names=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient:600)
kafka_1 | [2016-07-26 16:44:46,790] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions [names,0] (kafka.server.ReplicaFetcherManager)
kafka_1 | [2016-07-26 16:44:46,809] INFO Completed load of log names-0 with log end offset 0 (kafka.log.Log)
kafka_1 | [2016-07-26 16:44:46,810] INFO Created log for partition [names,0] in /data with properties {compression.type -> producer, message.format.version -> 0.10.0-IV1, file.delete.delay.ms -> 60000, max.message.bytes -> 1000012, message.timestamp.type -> CreateTime, min.insync.replicas -> 1, segment.jitter.ms -> 0, preallocate -> false, min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096, unclean.leader.election.enable -> true, retention.bytes -> -1, delete.retention.ms -> 86400000, cleanup.policy -> delete, flush.ms -> 9223372036854775807, segment.ms -> 604800000, segment.bytes -> 1073741824, retention.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760, flush.messages -> 9223372036854775807}. (kafka.log.LogManager)
kafka_1 | [2016-07-26 16:44:46,814] INFO Partition [names,0] on broker 0: No checkpointed highwatermark is found for partition [names,0] (kafka.cluster.Partition)
Mitigation
As a workaround, I'm manually creating my topics after the Kafka Cluster is started, but before any connectors are submitted, with:
$ kafka-topics --zookeeper $(docker-machine ip default):2181/broker-0 --list
__consumer_offsets
connect-configs
connect-offsets
connect-status
$ kafka-topics --zookeeper $(docker-machine ip default):2181/broker-0 --create --partitions 1 --replication-factor 1 --topic names
Created topic "names".
$ kafka-topics --zookeeper $(docker-machine ip default):2181/broker-0 --list
__consumer_offsets
connect-configs
connect-offsets
connect-status
names
This does not alleviate any of the other kafka issues detailed in #3 ("OffsetStorageWriter is already flushing"), but at least works around this sub-issue towards getting a functioning hello kafka connect example working.