docker-kafka-connect icon indicating copy to clipboard operation
docker-kafka-connect copied to clipboard

Auto topic creation issues

Open mcandre opened this issue 8 years ago • 0 comments

Summary

With default configuration, the 1ambda/kafka-connect stack fails the first Source send, because the auto topic creation fails to do all the work necessary to establish the topic.

Source

https://github.com/mcandre/hello-kafka-connect

Trace

$ gradle clean shadowJar
$ docker-compose build --no-cache
$ docker-compose rm -f && docker-compose up --force-recreate
...
$ curl -XPOST $(docker-machine ip default):8083/connectors \
       -H "Content-Type: application/json" \
       -d "{
             \"name\": \"name-source\",
             \"config\": {
               \"connector.class\": \"us.yellosoft.hellokafkaconnect.NameSource\",
               \"tasks.max\": \"1\",
               \"topics\": \"names\",
               \"kafka_partitions\": \"1\",
               \"redis_address\": \"redis://$(docker-machine ip default):6379\",
               \"name_list_key\": \"names\"
              }
           }" | jq .

{
  "name": "name-source",
  "config": {
    "connector.class": "us.yellosoft.hellokafkaconnect.NameSource",
    "tasks.max": "1",
    "topics": "names",
    "kafka_partitions": "1",
    "redis_address": "redis://192.168.99.101:6379",
    "name_list_key": "names",
    "name": "name-source"
  },
  "tasks": []
}

$ curl $(docker-machine ip default):8083/connectors | jq .
[
  "name-source"
]

$ redis-cli -h $(docker-machine ip default) lpush names 'Alice'
(integer) 1

$ redis-cli -h $(docker-machine ip default) llen names
(integer) 1

$ redis-cli -h $(docker-machine ip default) llen names
(integer) 0

$ kafka-topics --zookeeper $(docker-machine ip default):2181/broker-0 --list
__consumer_offsets
connect-configs
connect-offsets
connect-status
names

(back in the docker-compose logs)
...
zk_1       | 2016-07-26 16:44:46,747 [myid:] - INFO  [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@651] - Got user-level KeeperException when processing sessionid:0x1562815410b0005 type:setData cxid:0x1b6 zxid:0xd6 txntype:-1 reqpath:n/a Error Path:/broker-0/config/topics/names Error:KeeperErrorCode = NoNode for /broker-0/config/topics/names
zk_1       | 2016-07-26 16:44:46,749 [myid:] - INFO  [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@651] - Got user-level KeeperException when processing sessionid:0x1562815410b0005 type:create cxid:0x1b7 zxid:0xd7 txntype:-1 reqpath:n/a Error Path:/broker-0/config/topics Error:KeeperErrorCode = NodeExists for /broker-0/config/topics
kafka_1    | [2016-07-26 16:44:46,753] INFO Topic creation {"version":1,"partitions":{"0":[0]}} (kafka.admin.AdminUtils$)
zk_1       | 2016-07-26 16:44:46,761 [myid:] - INFO  [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@651] - Got user-level KeeperException when processing sessionid:0x1562815410b0005 type:create cxid:0x1bf zxid:0xda txntype:-1 reqpath:n/a Error Path:/broker-0/brokers/topics/names/partitions/0 Error:KeeperErrorCode = NoNode for /broker-0/brokers/topics/names/partitions/0
kafka_1    | [2016-07-26 16:44:46,762] INFO [KafkaApi-0] Auto creation of topic names with 1 partitions and replication factor 1 is successful (kafka.server.KafkaApis)
zk_1       | 2016-07-26 16:44:46,762 [myid:] - INFO  [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@651] - Got user-level KeeperException when processing sessionid:0x1562815410b0005 type:create cxid:0x1c0 zxid:0xdb txntype:-1 reqpath:n/a Error Path:/broker-0/brokers/topics/names/partitions Error:KeeperErrorCode = NoNode for /broker-0/brokers/topics/names/partitions
connect_1  | [2016-07-26 16:44:46,770] WARN Error while fetching metadata with correlation id 0 : {names=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient:600)
kafka_1    | [2016-07-26 16:44:46,790] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions [names,0] (kafka.server.ReplicaFetcherManager)
kafka_1    | [2016-07-26 16:44:46,809] INFO Completed load of log names-0 with log end offset 0 (kafka.log.Log)
kafka_1    | [2016-07-26 16:44:46,810] INFO Created log for partition [names,0] in /data with properties {compression.type -> producer, message.format.version -> 0.10.0-IV1, file.delete.delay.ms -> 60000, max.message.bytes -> 1000012, message.timestamp.type -> CreateTime, min.insync.replicas -> 1, segment.jitter.ms -> 0, preallocate -> false, min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096, unclean.leader.election.enable -> true, retention.bytes -> -1, delete.retention.ms -> 86400000, cleanup.policy -> delete, flush.ms -> 9223372036854775807, segment.ms -> 604800000, segment.bytes -> 1073741824, retention.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760, flush.messages -> 9223372036854775807}. (kafka.log.LogManager)
kafka_1    | [2016-07-26 16:44:46,814] INFO Partition [names,0] on broker 0: No checkpointed highwatermark is found for partition [names,0] (kafka.cluster.Partition)

Mitigation

As a workaround, I'm manually creating my topics after the Kafka Cluster is started, but before any connectors are submitted, with:

$ kafka-topics --zookeeper $(docker-machine ip default):2181/broker-0 --list
__consumer_offsets
connect-configs
connect-offsets
connect-status

$ kafka-topics --zookeeper $(docker-machine ip default):2181/broker-0 --create --partitions 1 --replication-factor 1 --topic names
Created topic "names".

$ kafka-topics --zookeeper $(docker-machine ip default):2181/broker-0 --list
__consumer_offsets
connect-configs
connect-offsets
connect-status
names

This does not alleviate any of the other kafka issues detailed in #3 ("OffsetStorageWriter is already flushing"), but at least works around this sub-issue towards getting a functioning hello kafka connect example working.

mcandre avatar Jul 26 '16 17:07 mcandre