kafka-cheat-sheet
kafka-cheat-sheet copied to clipboard
Kafka Cheat Sheet
= Kafka Cheat Sheet :toc: :toc-placement!: :icons:
Here you have some useful commands for kafka.
Tested on Kafka 2.5
Goals
- [x] Add most useful commands for kafka
- [x] Add kafkacat commands
- [ ] Add commands output
- [x] Add commands for Kafka on Kubernetes
toc::[]
== Pre-req
First, set some kafka environment vars.
[source,bash]
For Kafka running on top of VMs/Bare Metal
KAFKA_BIN=/opt/kafka/bin ZOOKEEPER_HOST=zookeeper-host:2181 BROKER_HOST=broker-host:9092
For Kafka running on top of Kubernetes (Using strimzi)
KAFKA_NAMESPACE=kafka-demo ZOOKEEPER_HOST=localhost:2181 BROKER_HOST=localhost:9092 ZOOKEEPER_POD=$(kubectl -n $KAFKA_NAMESPACE get pods -l app.kubernetes.io/name=zookeeper -o=jsonpath='{.items[0].metadata.name}') KAFKA_BROKER_POD=$(kubectl -n $KAFKA_NAMESPACE get pods -l app.kubernetes.io/name=kafka -o=jsonpath='{.items[0].metadata.name}')
== Zookeeper Operations
You need to whitelist all the commands bellow.
.zookeeper.properties
4lw.commands.whitelist=stat,ruok,reqs,envi,dump,conf,cons,srvr,wchs,wchc,dirs,wchp,mntr,isro
- If using Zookeeper Auth (SASL)
[source,bash]
Zookeeper Auth
export KAFKA_OPTS="-Djava.security.auth.login.config=/tmp/jaas.conf"
.jass.conf
Client { org.apache.zookeeper.server.auth.DigestLoginModule required username="test" password="test"; };
- If using SSL/TLS on Zookeeper + SASL
[source,bash]
export KAFKA_OPTS="-Djava.security.auth.login.config=/tmp/jaas.conf -Dzookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNetty -Dzookeeper.client.secure=true -Dzookeeper.ssl.trustStore.location=/tmp/kafka.server.truststore -Dzookeeper.ssl.trustStore.password=mypass -Dzookeeper.ssl.trustStore.type=PKCS12"
NOTE: Remember to change your zookeeper port on the ZOOKEEPER_HOST
if necessary
=== Get runtime conf
[source,bash]
For VMs
echo conf | curl telnet://$ZOOKEEPER_HOST
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bash -c "echo conf | curl telnet://localhost:2181"
=== Get runtime environments
[source,bash]
For VMs
echo envi | curl telnet://$ZOOKEEPER_HOST
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bash -c "echo envi | curl telnet://localhost:2181"
=== Health Check
[source,bash]
For VMs
echo stats | curl telnet://$ZOOKEEPER_HOST echo ruok | curl telnet://$ZOOKEEPER_HOST
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bash -c "echo stats | curl telnet://localhost:2181" kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bash -c "echo ruok | curl telnet://localhost:2181"
=== Connections
[source,bash]
For VMs
echo reqs | curl telnet://$ZOOKEEPER_HOST echo cons | curl telnet://$ZOOKEEPER_HOST
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bash -c "echo reqs | curl telnet://localhost:2181" kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bash -c "echo cons | curl telnet://localhost:2181"
=== Details of the server
[source,bash]
For VMs
echo srvr | curl telnet://$ZOOKEEPER_HOST
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bash -c "echo srvr | curl telnet://localhost:2181"
=== Brief info about watches
[source,bash]
For VMs
echo wchs | curl telnet://$ZOOKEEPER_HOST
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bash -c "echo wchs | curl telnet://localhost:2181"
=== Details about watches
[source,bash]
For VMs
echo wchc | curl telnet://$ZOOKEEPER_HOST
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bash -c "echo wchs | curl telnet://localhost:2181"
=== Snapshots info
[source,bash]
For VMs
echo dirs | curl telnet://$ZOOKEEPER_HOST
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bash -c "echo dirs | curl telnet://localhost:2181"
=== Monitoring vars
[source,bash]
For VMs
echo mntr | curl telnet://$ZOOKEEPER_HOST
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bash -c "echo mntr | curl telnet://localhost:2181"
=== Get read-only or read-write mode
[source,bash]
For VMs
echo isro | curl telnet://$ZOOKEEPER_HOST
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bash -c "echo isro | curl telnet://localhost:2181"
=== Get Process
[source,bash]
For VMs
jps | grep QuorumPeerMain
For kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $ZOOKEEPER_POD -- bash -c "ps aux | grep QuorumPeerMain"
== Broker Operations
=== List active brokers
[source,bash]
For VMs
$KAFKA_BIN/zookeeper-shell.sh $ZOOKEEPER_HOST ls /brokers/ids
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/zookeeper-shell.sh localhost:2181 ls /brokers/ids
[source,bash]
kafkacat -b $BROKER_HOST -L
=== List Broker Controller
[source,bash]
For VMs
$KAFKA_BIN/zookeeper-shell.sh $ZOOKEEPER_HOST get /controller
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/zookeeper-shell.sh localhost:2181 get /controller
=== List broker details
[source,bash]
For VMs
$KAFKA_BIN/zookeeper-shell.sh $ZOOKEEPER_HOST ls /brokers/ids/{id}
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/zookeeper-shell.sh localhost:2181 ls /brokers/ids/{id}
[source,bash]
kafkacat -b $BROKER_HOST -L
=== List topics
[source,bash]
For VMs
$KAFKA_BIN/zookeeper-shell.sh $ZOOKEEPER_HOST ls /brokers/topics
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/zookeeper-shell.sh localhost:2181 ls /brokers/topics
[source,bash]
kafkacat -b $BROKER_HOST -L -t
=== Change Broker Config
Change log cleaner threads.
[source,bash]
For VMs
$KAFKA_BIN/kafka-configs.sh
--bootstrap-server $BROKER_HOST
--entity-type brokers
--entity-name
--alter
--add-config log.cleaner.threads=2
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh
--bootstrap-server $BROKER_HOST
--entity-type brokers
--entity-name
--alter
--add-config log.cleaner.threads=2
--alter
--add-config log.cleaner.threads=2
=== Describe broker dynamic config
[source,bash]
For VMs
$KAFKA_BIN/kafka-configs.sh
--bootstrap-server $BROKER_HOST
--entity-type brokers
--entity-name
--describe
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh
--bootstrap-server $BROKER_HOST
--entity-type brokers
--entity-name
--describe
--describe
=== Delete broker config
[source,bash]
For VMs
$KAFKA_BIN/kafka-configs.sh
--bootstrap-server $BROKER_HOST
--entity-type brokers
--entity-name
--alter
--delete-config log.cleaner.threads
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh
--bootstrap-server $BROKER_HOST
--entity-type brokers
--entity-name
--alter
--delete-config log.cleaner.threads
--alter
--delete-config log.cleaner.threads
=== Change cluster-wide dynamic config
[source,bash]
For VMs
$KAFKA_BIN/kafka-configs.sh
--bootstrap-server $BROKER_HOST
--entity-type brokers
--entity-default
--alter
--add-config log.cleaner.threads=2
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh
--bootstrap-server $BROKER_HOST
--entity-type brokers
--entity-default
--alter
--add-config log.cleaner.threads=2
=== Describe cluster-wide dynamic config
[source,bash]
For VMs
$KAFKA_BIN/kafka-configs.sh
--bootstrap-server $BROKER_HOST
--entity-type brokers
--entity-default
--describe
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh
--bootstrap-server $BROKER_HOST
--entity-type brokers
--entity-default
--describe
=== Disable hostname verification
[source,bash]
For VMs
$KAFKA_BIN/kafka-configs.sh
--bootstrap-server $BROKER_HOST
--entity-type brokers
--entity-name
--alter
--add-config "listener.name.internal.ssl.endpoint.identification.algorithm="
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh
--bootstrap-server $BROKER_HOST
--entity-type brokers
--entity-name
--alter
--add-config "listener.name.internal.ssl.endpoint.identification.algorithm="
--alter
--add-config "listener.name.internal.ssl.endpoint.identification.algorithm="
== Topic Operations
=== List topics using kafka-topics.sh
[source,bash]
For VMs
$KAFKA_BIN/kafka-topics.sh
--list
--zookeeper $ZOOKEEPER_HOST
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-topics.sh
--list
--zookeeper $ZOOKEEPER_HOST
[source,bash]
For VMs
$KAFKA_BIN/kafka-topics.sh
--bootstrap-server $BROKER_HOST
--list
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-topics.sh
--list
--bootstrap-server $BROKER_HOST
=== Describe topic
[source,bash]
For VMs
$KAFKA_BIN/kafka-topics.sh
--zookeeper $ZOOKEEPER_HOST
--topic <topic_name>
--describe
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-topics.sh
--zookeeper $ZOOKEEPER_HOST
--topic <topic_name>
--describe
[source,bash]
kafkacat -b $BROKER_HOST -L -t <topic_name>
=== Describe topic configs
[source,bash]
For VMs
$KAFKA_BIN/kafka-configs.sh
--zookeeper $ZOOKEEPER_HOST
--entity-type topics
--entity-name <topic_name>
--describe
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh
--zookeeper $ZOOKEEPER_HOST
--entity-type topics
--entity-name <topic_name>
--describe
=== Delete topic config
[source,bash]
For VMs
$KAFKA_BIN/kafka-configs.sh
--zookeeper $ZOOKEEPER_HOST
--entity-type topics
--entity-name <topic_name>
--delete-config
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh
--zookeeper $ZOOKEEPER_HOST
--entity-type topics
--entity-name <topic_name>
--delete-config
=== Copy topic to another topic in the same cluster
[source,bash]
kafkacat -C -b $BROKER_HOST -t <topic_name> -e | kafkacat -P -b $BROKER_HOST -t
=== Copy topic to another topic in another cluster
[source,bash]
kafkacat -C -b $BROKER_HOST -t <topic_name> -e | kafkacat -P -b $BROKER_HOST2 -t
=== Move topic to another broker
==== Create json necessary
.topics-to-move.json [source,json]
{"topics": [{"topic": "topic1"}, {"topic": "topic2"}], "version":1 }
==== Generate plan to move to brokers
.generate plan to move to broker 5 and 6 [source,bash]
For VMs
$KAFKA_BIN/kafka-reassign-partitions.sh
--zookeeper $ZOOKEEPER_HOST
--topics-to-move-json-file topics-to-move.json
--broker-list "5,6"
--generate
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh
--zookeeper $ZOOKEEPER_HOST
--entity-type topics
--entity-name <topic_name>
--delete-config
NOTE: save the results from the command above to cluster-reassignment.json
==== Move to broker 5 and 6
.move to broker 5 and 6 [source,bash]
For VMs
$KAFKA_BIN/kafka-reassign-partitions.sh
--zookeeper $ZOOKEEPER_HOST
--reassignment-json-file cluster-reassignment.json
--execute
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-reassign-partitions.sh
--zookeeper $ZOOKEEPER_HOST
--reassignment-json-file cluster-reassignment.json
--execute
==== Verify status
.verify status [source,bash]
For VMs
$KAFKA_BIN/kafka-reassign-partitions.sh
--zookeeper $ZOOKEEPER_HOST
--reassignment-json-file cluster-reassignment.json
--verify
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-reassign-partitions.sh
--zookeeper $ZOOKEEPER_HOST
--reassignment-json-file cluster-reassignment.json
--verify
=== Create topic
[source,bash]
For VMs
$KAFKA_BIN/kafka-topics.sh
--create
--zookeeper $ZOOKEEPER_HOST
--replication-factor 1
--partitions 1
--topic <topic_name>
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-topics.sh
--create
--zookeeper $ZOOKEEPER_HOST
--replication-factor 1
--partitions 1
--topic <topic_name>
=== Create topic with config
[source,bash]
For VMs
$KAFKA_BIN/kafka-topics.sh
--bootstrap-server $BROKER_HOST
--create
--topic <topic_name>
--partitions 1
--replication-factor 1
--config max.message.bytes=64000
--config flush.messages=1
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-topics.sh
--bootstrap-server $BROKER_HOST
--create
--topic <topic_name>
--partitions 1
--replication-factor 1
--config max.message.bytes=64000
--config flush.messages=1
=== Increase replication factor of __consumer_offsets
==== Create replication plan
.reassignment.json [source,json]
{"version":1, "partitions":[ {"topic":"__consumer_offsets", "partition":0, "replicas":[106,101,102,105]}, {"topic":"__consumer_offsets", "partition":1, "replicas":[106,101,102,105]}, {"topic":"__consumer_offsets", "partition":2, "replicas":[106,101,102,105]}, {"topic":"__consumer_offsets", "partition":3, "replicas":[106,101,102,105]}, {"topic":"__consumer_offsets", "partition":4, "replicas":[106,101,102,105]}, {"topic":"__consumer_offsets", "partition":5, "replicas":[106,101,102,105]}, {"topic":"__consumer_offsets", "partition":6, "replicas":[106,101,102,105]}, {"topic":"__consumer_offsets", "partition":7, "replicas":[106,101,102,105]}, {"topic":"__consumer_offsets", "partition":8, "replicas":[106,101,102,105]}, {"topic":"__consumer_offsets", "partition":9, "replicas":[106,101,102,105]}, {"topic":"__consumer_offsets", "partition":10, "replicas":[101,102,103,105]}, {"topic":"__consumer_offsets", "partition":11, "replicas":[101,102,103,105]}, {"topic":"__consumer_offsets", "partition":12, "replicas":[101,102,103,105]}, {"topic":"__consumer_offsets", "partition":13, "replicas":[101,102,103,105]}, {"topic":"__consumer_offsets", "partition":14, "replicas":[101,102,103,105]}, {"topic":"__consumer_offsets", "partition":15, "replicas":[101,102,103,105]}, {"topic":"__consumer_offsets", "partition":16, "replicas":[101,102,103,105]}, {"topic":"__consumer_offsets", "partition":17, "replicas":[101,102,103,105]}, {"topic":"__consumer_offsets", "partition":18, "replicas":[101,102,103,105]}, {"topic":"__consumer_offsets", "partition":19, "replicas":[101,102,103,105]}, {"topic":"__consumer_offsets", "partition":20, "replicas":[102,103,104,105]}, {"topic":"__consumer_offsets", "partition":21, "replicas":[102,103,104,105]}, {"topic":"__consumer_offsets", "partition":22, "replicas":[102,103,104,105]}, {"topic":"__consumer_offsets", "partition":23, "replicas":[102,103,104,105]}, {"topic":"__consumer_offsets", "partition":24, "replicas":[102,103,104,105]}, {"topic":"__consumer_offsets", "partition":25, "replicas":[102,103,104,105]}, {"topic":"__consumer_offsets", "partition":26, "replicas":[102,103,104,105]}, {"topic":"__consumer_offsets", "partition":27, "replicas":[102,103,104,105]}, {"topic":"__consumer_offsets", "partition":28, "replicas":[102,103,104,105]}, {"topic":"__consumer_offsets", "partition":29, "replicas":[102,103,104,105]}, {"topic":"__consumer_offsets", "partition":30, "replicas":[103,104,106,105]}, {"topic":"__consumer_offsets", "partition":31, "replicas":[103,104,106,105]}, {"topic":"__consumer_offsets", "partition":32, "replicas":[103,104,106,105]}, {"topic":"__consumer_offsets", "partition":33, "replicas":[103,104,106,105]}, {"topic":"__consumer_offsets", "partition":34, "replicas":[103,104,106,105]}, {"topic":"__consumer_offsets", "partition":35, "replicas":[103,104,106,105]}, {"topic":"__consumer_offsets", "partition":36, "replicas":[103,104,106,105]}, {"topic":"__consumer_offsets", "partition":37, "replicas":[103,104,106,105]}, {"topic":"__consumer_offsets", "partition":38, "replicas":[103,104,106,105]}, {"topic":"__consumer_offsets", "partition":39, "replicas":[103,104,106,105]}, {"topic":"__consumer_offsets", "partition":40, "replicas":[104,106,101,105]}, {"topic":"__consumer_offsets", "partition":41, "replicas":[104,106,101,105]}, {"topic":"__consumer_offsets", "partition":42, "replicas":[104,106,101,105]}, {"topic":"__consumer_offsets", "partition":43, "replicas":[104,106,101,105]}, {"topic":"__consumer_offsets", "partition":44, "replicas":[104,106,101,105]}, {"topic":"__consumer_offsets", "partition":45, "replicas":[104,106,101,105]}, {"topic":"__consumer_offsets", "partition":46, "replicas":[104,106,101,105]}, {"topic":"__consumer_offsets", "partition":47, "replicas":[104,106,101,105]}, {"topic":"__consumer_offsets", "partition":48, "replicas":[104,106,101,105]}, {"topic":"__consumer_offsets", "partition":49, "replicas":[104,106,101,105]} ] }
==== Increase partition
[source,bash]
For VMs
$KAFKA_BIN/kafka-reassign-partitions.sh
--zookeeper $ZOOKEEPER_HOST
--reassignment-json-file reassignment.json
--execute
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-reassign-partitions.sh
--zookeeper $ZOOKEEPER_HOST
--reassignment-json-file reassignment.json
--execute
==== Verify reassignment
[source,bash]
For VMs
$KAFKA_BIN/kafka-reassign-partitions.sh
--zookeeper $ZOOKEEPER_HOST
--reassignment-json-file reassignment.json
--verify
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-reassign-partitions.sh
--zookeeper $ZOOKEEPER_HOST
--reassignment-json-file reassignment.json
--verify
=== Alter topic
==== Alter retention time
[source,bash]
For VMs
$KAFKA_BIN/kafka-topics.sh
--zookeeper $ZOOKEEPER_HOST
--alter
--topic <topic_name>
--config retention.ms=1000
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-topics.sh
--zookeeper $ZOOKEEPER_HOST
--alter
--topic <topic_name>
--config retention.ms=1000
==== Alter min.insync.replicas
[source,bash]
For VMs
$KAFKA_BIN/kafka-topics.sh
--zookeeper $ZOOKEEPER_HOST
--alter
--topic <topic_name>
--config min.insync.replicas=2
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-topics.sh
--zookeeper $ZOOKEEPER_HOST
--alter
--topic <topic_name>
--config min.insync.replicas=2
==== Alter max.message.bytes
[source,bash]
For VMs
$KAFKA_BIN/kafka-configs.sh
--zookeeper $ZOOKEEPER_HOST
--entity-type topics
--entity-name <topic_name>
--alter
--add-config max.message.bytes=128000
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh
--zookeeper $ZOOKEEPER_HOST
--entity-type topics
--entity-name <topic_name>
--alter
--add-config max.message.bytes=128000
==== Delete retention time
[source,bash]
For VMs
$KAFKA_BIN/kafka-topics.sh
--zookeeper $ZOOKEEPER_HOST
--alter
--topic <topic_name>
--delete-config retention.ms
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-topics.sh
--zookeeper $ZOOKEEPER_HOST
--alter
--topic <topic_name>
--delete-config retention.ms
[source,bash]
For VMs
$KAFKA_BIN/kafka-configs.sh
--zookeeper $ZOOKEEPER_HOST \
--entity-type topics
--entity-name <topic_name>
--alter
--delete-config retention.ms
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh
--zookeeper $ZOOKEEPER_HOST \
--entity-type topics
--entity-name <topic_name>
--alter
--delete-config retention.ms
=== List topics under-replicated
[source,bash]
For VMs
$KAFKA_BIN/kafka-topics.sh
--zookeeper $ZOOKEEPER_HOST
--describe
--under-replicated-partitions
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-topics.sh
--zookeeper $ZOOKEEPER_HOST
--describe
--under-replicated-partitions
=== Delete topic
[source,bash]
For VMs
$KAFKA_BIN/kafka-topics.sh
--delete
--zookeeper $ZOOKEEPER_HOST
--topic <topic_name>
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-topics.sh
--delete
--zookeeper $ZOOKEEPER_HOST
--topic <topic_name>
[source,bash]
For VMs
$KAFKA_BIN/kafka-topics.sh
--bootstrap-server $BROKER_HOST
--delete
--topic <topic_name>
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-topics.sh
--bootstrap-server $BROKER_HOST
--delete
--topic <topic_name>
=== Get earliest offset
[source,bash]
For VMs
$KAFKA_BIN/kafka-run-class.sh
kafka.tools.GetOffsetShell
--broker-list $BROKER_HOST
--topic <topic_name>
--time -2
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-run-class.sh
kafka.tools.GetOffsetShell
--broker-list $BROKER_HOST
--topic <topic_name>
--time -2
=== Get latest offset
[source,bash]
For VMs
$KAFKA_BIN/kafka-run-class.sh
kafka.tools.GetOffsetShell
--broker-list $BROKER_HOST
--topic <topic_name>
--time -1
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-run-class.sh
kafka.tools.GetOffsetShell
--broker-list $BROKER_HOST
--topic <topic_name>
--time -1
== Partition Operations
=== Increase partition number
[source,bash]
For VMs
$KAFKA_BIN/kafka-topics.sh
--alter
--topic <topic_name>
--partitions 8
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-topics.sh
--alter
--topic <topic_name>
--partitions 8
=== Increase replication factor
.topics.json [source,json]
{ "topics": [ { "topic": "test" } ], "version": 1 }
[source,bash]
For VMs
$KAFKA_BIN/kafka-reassign-partitions.sh
--zookeeper $ZOOKEEPER_HOST
--generate
--broker-list "401,402,601"
--topics-to-move-json-file topics.json
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-reassign-partitions.sh
--zookeeper $ZOOKEEPER_HOST
--generate
--broker-list "401,402,601"
--topics-to-move-json-file topics.json
.new-replication-factor.json [source,json]
{"version":1,"partitions":[{"topic":"topic1","partition":0,"replicas":[5,6,7]}]}
.execute new replication factor [source,bash]
For VMs
$KAFKA_BIN/kafka-reassign-partitions.sh
--zookeeper $ZOOKEEPER_HOST
--reassignment-json-file new-replication-factor.json
--execute
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-reassign-partitions.sh
--zookeeper $ZOOKEEPER_HOST
--reassignment-json-file new-replication-factor.json
--execute
.verify status of partition reassignment [source,bash]
For VMs
$KAFKA_BIN/kafka-reassign-partitions.sh
--zookeeper $ZOOKEEPER_HOST
--reassignment-json-file new-replication-factor.json
--verify
$KAFKA_BIN/kafka-topics.sh
--bootstrap-server $ZOOKEEPER_HOST
--topic <topic_name>
--describe
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-reassign-partitions.sh
--zookeeper $ZOOKEEPER_HOST
--reassignment-json-file new-replication-factor.json
--verify
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-topics.sh
--bootstrap-server $ZOOKEEPER_HOST
--topic <topic_name>
--describe
=== Reassign partitions
Create plan
.topics.json [source,json]
{ "topics": [ { "topic": "test" } ], "version": 1 }
[source,bash]
For VMs
$KAFKA_BIN/kafka-reassign-partitions.sh
--zookeeper $ZOOKEEPER_HOST
--generate
--broker-list "401,402,601"
--topics-to-move-json-file topics.json
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-reassign-partitions.sh
--zookeeper $ZOOKEEPER_HOST
--generate
--broker-list "401,402,601"
--topics-to-move-json-file topics.json
Save the result of the above command to a file named replicas.json
[source,bash]
For VMs
$KAFKA_BIN/kafka-reassign-partitions.sh
--zookeeper $ZOOKEEPER_HOST
--reassignment-json-file replicas.json
--execute
$KAFKA_BIN/kafka-reassign-partitions.sh
--zookeeper $ZOOKEEPER_HOST
--reassignment-json-file replicas.json
--verify
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-reassign-partitions.sh
--zookeeper $ZOOKEEPER_HOST
--reassignment-json-file replicas.json
--execute
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-reassign-partitions.sh
--zookeeper $ZOOKEEPER_HOST
--reassignment-json-file replicas.json
--verify
=== List unavailable partitions
[source,bash]
For VMs
$KAFKA_BIN/kafka-topics.sh
--zookeeper $ZOOKEEPER_HOST
--describe
--unavailable-partitions
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-topics.sh
--zookeeper $ZOOKEEPER_HOST
--describe
--unavailable-partitions
=== Force election on all partitions
[source,bash]
For VMs
$KAFKA_BIN/kafka-leader-election.sh
--election-type preferred
--bootstrap-server $BROKER_HOST
--all-topic-partitions
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-leader-election.sh
--election-type preferred
--bootstrap-server $BROKER_HOST
--all-topic-partitions
=== Force election on specific topic and partition
[source,bash]
For VMs
$KAFKA_BIN/kafka-leader-election.sh
--election-type preferred
--bootstrap-server $BROKER_HOST
--topic
--partition
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-leader-election.sh
--election-type preferred
--bootstrap-server $BROKER_HOST
--topic
--partition
--partition
== Consumer
=== List consumer groups
[source,bash]
For VMs
$KAFKA_BIN/kafka-consumer-groups.sh
--list
--bootstrap-server $BROKER_HOST
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-consumer-groups.sh
--list
--bootstrap-server $BROKER_HOST
=== Describe consumer groups
[source,bash]
For VMs
$KAFKA_BIN/kafka-consumer-groups.sh
--describe
--group <group_id>
--bootstrap-server $BROKER_HOST
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-consumer-groups.sh
--describe
--group <group_id>
--bootstrap-server $BROKER_HOST
=== Describe all consumer groups
[source,bash]
For VMs
$KAFKA_BIN/kafka-consumer-groups.sh
--describe
--bootstrap-server $BROKER_HOST
--all-groups
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-consumer-groups.sh
--describe
--bootstrap-server $BROKER_HOST
--all-groups
=== Delete consumer group
[source,bash]
For VMs
$KAFKA_BIN/kafka-consumer-groups.sh
--bootstrap-server $BROKER_HOST
--delete
--group
--group
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-consumer-groups.sh
--bootstrap-server $BROKER_HOST
--delete
--group
--group
--group
=== Active member in a consumer group
[source,bash]
For VMs
$KAFKA_BIN/kafka-consumer-groups.sh
--bootstrap-server $BROKER_HOST
--describe
--group
--members
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-consumer-groups.sh
--bootstrap-server $BROKER_HOST
--describe
--group
--members
--members
=== Partition Assigned to each member
[source,bash]
For VMs
$KAFKA_BIN/kafka-consumer-groups.sh
--bootstrap-server $BROKER_HOST
--describe
--group <group_id>
--members
--verbose
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-consumer-groups.sh
--bootstrap-server $BROKER_HOST
--describe
--group <group_id>
--members
--verbose
=== Consumer Group State
[source,bash]
For VMs
$KAFKA_BIN/kafka-consumer-groups.sh
--bootstrap-server $BROKER_HOST
--describe
--group
--state
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-consumer-groups.sh
--bootstrap-server $BROKER_HOST
--describe
--group
--state
--state
=== Consuming message
[source,bash]
For VMs
$KAFKA_BIN/kafka-console-consumer.sh
--bootstrap-server $BROKER_HOST
--topic <topic_name>
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-console-consumer.sh
--bootstrap-server $BROKER_HOST
--topic <topic_name>
[source,bash]
kafkacat -C -b $BROKER_HOST -t <topic_name>
=== Consuming message and formatting output
[source,bash]
kafkacat -C -b $BROKER_HOST -t <topic_name> -q -f 'Topic %t using partition %p at offset %o has key = %k and value = %S'
=== Consuming message from the beginning
[source,bash]
For VMs
$KAFKA_BIN/kafka-console-consumer.sh
--bootstrap-server $BROKER_HOST
--topic <topic_name>
--from-beginning
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-console-consumer.sh
--bootstrap-server $BROKER_HOST
--topic <topic_name>
--from-beginning
=== Consuming message from the end
[source,bash]
For VMs
$KAFKA_BIN/kafka-console-consumer.sh
--bootstrap-server $BROKER_HOST
--topic <topic_name>
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-console-consumer.sh
--bootstrap-server $BROKER_HOST
--topic <topic_name>
=== Consuming message and show output in JSON
[source,bash]
kafkacat -b $BROKER_HOST -t <topic_name> -J
=== Consuming and showing message key
[source,bash]
For VMs
$KAFKA_BIN/kafka-console-consumer.sh
--bootstrap-server $BROKER_HOST
--topic <topic_name>
--property print.key=true
--property key.separator=,
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-console-consumer.sh
--bootstrap-server $BROKER_HOST
--topic <topic_name>
--property print.key=true
--property key.separator=,
=== Read one message
[source,bash]
For VMs
$KAFKA_BIN/kafka-console-consumer.sh
--bootstrap-server $BROKER_HOST
--topic <topic_name>
--max-messages 1
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-console-consumer.sh
--bootstrap-server $BROKER_HOST
--topic <topic_name>
--max-messages 1
=== Read the last 2 messages from topic and then exit
[source,bash]
kafkacat -C -b $BROKER_HOST -t <topic_name> -o -2 -e
=== Read the last 2 messages from partition 0
[source,bash]
kafkacat -C -b $BROKER_HOST -t <topic_name> -o -2 -e -p 0
=== Read from __consumer_offsets
[source,bash]
For VMs
$KAFKA_BIN/kafka-console-consumer.sh
--bootstrap-server $BROKER_HOST
--topic __consumer_offsets
--formatter 'kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter'
--max-messages 1
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-console-consumer.sh
--bootstrap-server $BROKER_HOST
--topic __consumer_offsets
--formatter 'kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter'
--max-messages 1
=== Describe __consumer_offsets
[source,bash]
For VMs
$KAFKA_BIN/kafka-run-class.sh kafka.admin.ConsumerGroupCommand
--bootstrap-server $BROKER_HOST
--group
--new-consumer
--describe
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand
--bootstrap-server $BROKER_HOST
--group
--new-consumer
--describe
--new-consumer
--describe
=== Read from __transaction_state
[source,bash]
For VMs
$KAFKA_BIN/kafka-console-consumer.sh
--bootstrap-server $BROKER_HOST
--formatter "kafka.coordinator.transaction.TransactionLog$TransactionLogMessageFormatter"
--topic __transaction_state
--from-beginning
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-console-consumer.sh
--bootstrap-server $BROKER_HOST
--topic __transaction_state
--from-beginning
--formatter "kafka.coordinator.transaction.TransactionLog$TransactionLogMessageFormatter"
=== Consume using consumer group
[source,bash]
For VMs
$KAFKA_BIN/kafka-console-consumer.sh
--topic <topic_name>
--bootstrap-server $BROKER_HOST
--group
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-console-consumer.sh
--topic <topic_name>
--bootstrap-server $BROKER_HOST
--group
=== Topics to which group is subscribed
[source,bash]
For VMs
$KAFKA_BIN/kafka-consumer-groups.sh
--bootstrap-server $BROKER_HOST
--group <group_id>
--describe
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-consumer-groups.sh
--bootstrap-server $BROKER_HOST
--group <group_id>
--describe
=== Reset offset
==== Reset to the latest offset
[source,bash]
For VMs
$KAFKA_BIN/kafka-consumer-groups.sh
--bootstrap-server $BROKER_HOST
--reset-offsets
--group
--topic topic1
--to-latest
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-consumer-groups.sh
--bootstrap-server $BROKER_HOST
--reset-offsets
--group
--topic topic1
--to-latest
--topic topic1
--to-latest
==== Reset offset for a consumer group in a topic
[source,bash]
For VMs
There are many other resetting options
--shift-by <positive_or_negative_integer> / --to-current / --to-latest / --to-offset <offset_integer>
--to-datetime <datetime_string> --by-duration <duration_string>
$KAFKA_BIN/kafka-consumer-groups.sh
--bootstrap-server $BROKER_HOST
--group <group_id>
--topic <topic_name>
--reset-offsets
--to-earliest
--execute
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-consumer-groups.sh
--bootstrap-server $BROKER_HOST
--group <group_id>
--topic <topic_name>
--reset-offsets
--to-earliest
--execute
==== Reset offset from all consumer groups
[source,bash]
For VMs
$KAFKA_BIN/kafka-consumer-groups.sh
--bootstrap-server $BROKER_HOST
--all-groups
--reset-offsets
--topic <topic_name>
--to-earliest
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-consumer-groups.sh
--bootstrap-server $BROKER_HOST
--all-groups
--reset-offsets
--topic <topic_name>
--to-earliest
==== Forward by 2 for example
[source,bash]
For VMs
$KAFKA_BIN/kafka-consumer-groups.sh
--bootstrap-server $BROKER_HOST
--group <groud_id>
--reset-offsets
--shift-by 2
--execute
--topic <topic_name>
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-consumer-groups.sh
--bootstrap-server $BROKER_HOST
--group <groud_id>
--reset-offsets
--shift-by 2
--execute
--topic <topic_name>
==== Backward by 2 for example
[source,bash]
For VMs
$KAFKA_BIN/kafka-consumer-groups.sh
--bootstrap-server $BROKER_HOST
--group <groud_id>
--reset-offsets
--shift-by -2
--execute
--topic <topic_name>
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-consumer-groups.sh
--bootstrap-server $BROKER_HOST
--group <groud_id>
--reset-offsets
--shift-by -2
--execute
--topic <topic_name>
=== Describe consumer group
[source,bash]
For VMs
$KAFKA_BIN/kafka-consumer-groups.sh
--bootstrap-server $BROKER_HOST
--describe
--group <group_id>
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-consumer-groups.sh
--bootstrap-server $BROKER_HOST
--describe
--group <group_id>
=== Check offset for consumer group
[source,bash]
For VMs
$KAFKA_BIN/kafka-consumer-offset-checker.sh
--zookeeper $ZOOKEEPER_HOST
--group <group_id>
--topic <topic_name>
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-consumer-offset-checker.sh
--zookeeper $ZOOKEEPER_HOST
--group <group_id>
--topic <topic_name>
== Producer
=== Send message using file
[source,bash]
For VMs
$KAFKA_BIN/kafka-console-producer.sh
--broker-list $BROKER_HOST
--topic <topic_name> < messages.txt
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-console-producer.sh
--broker-list $BROKER_HOST
--topic <topic_name> < messages.txt
[source,bash]
kafkacat -P -l -b $BROKER_HOST -t <topic_name> messages.txt
=== Send message using standard input
[source,bash]
For VMs
$KAFKA_BIN/kafka-console-producer.sh
--broker-list $BROKER_HOST
--topic <topic_name>
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-console-producer.sh
--broker-list $BROKER_HOST
--topic <topic_name>
[source,bash]
kafkacat -P -b $BROKER_HOST -t <topic_name>
=== Send message using snappy compression
[source,bash]
kafkacat -P -b $BROKER_HOST -t <topic_name> -z snappy
=== Send 200 messages to a topic
[source,bash]
seq 200 | kafkacat -P -b $BROKER_HOST -t <topic_name>
=== Send message using string
[source,bash]
For VMs
echo "My Message" | $KAFKA_BIN/kafka-console-producer.sh
--broker-list $BROKER_HOST
--topic <topic_name>
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-console-producer.sh
--broker-list $BROKER_HOST
--topic <topic_name>
[source,bash]
echo "My Message" | kafkacat -b $BROKER_HOST -t <topic_name>
=== Send message using headers
[source,bash]
echo "My Message" | kafkacat -b $BROKER_HOST -t <topic_name> echo "My Message" | kafkacat -b $BROKER_HOST -H "header1=value1" -H "header2=value2"
=== Send message using ack=all
[source,bash]
For VMs
$KAFKA_BIN/kafka-console-producer.sh
--broker-list $BROKER_HOST
--topic <topic_name>
--producer-property acks=all
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-console-producer.sh
--broker-list $BROKER_HOST
--topic <topic_name>
--producer-property acks=all
=== Send message with key
[source,bash]
For VMs
$KAFKA_BIN/kafka-console-producer.sh
--broker-list $BROKER_HOST
--topic <topic_name>
--property parse.key=true
--property key.separator=,
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-console-producer.sh
--broker-list $BROKER_HOST
--topic <topic_name>
--property parse.key=true
--property key.separator=,
NOTE: Your message should be:
== Quotas
=== Add quota for user and client-id
[source,bash]
For VMs
$KAFKA_BIN/kafka-configs.sh
--zookeeper $ZOOKEEPER_HOST
--alter
--add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200'
--entity-type users
--entity-name
--entity-type clients
--entity-name
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh
--zookeeper $ZOOKEEPER_HOST
--alter
--add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200'
--entity-type users
--entity-name
--entity-type clients
--entity-name
--entity-type clients
--entity-name
=== Add quota for user
[source,bash]
For VMs
$KAFKA_BIN/kafka-configs.sh
--zookeeper $ZOOKEEPER_HOST
--alter
--add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200'
--entity-type users
--entity-name
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh
--zookeeper $ZOOKEEPER_HOST
--alter
--add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200'
--entity-type users
--entity-name
=== Add quota for client-id
[source,bash]
For VMs
$KAFKA_BIN/kafka-configs.sh
--zookeeper $ZOOKEEPER_HOST
--alter
--add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200'
--entity-type clients
--entity-name
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh
--zookeeper $ZOOKEEPER_HOST
--alter
--add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200'
--entity-type clients
--entity-name
=== Add default client-id quota for user
[source,bash]
For VMs
$KAFKA_BIN/kafka-configs.sh
--zookeeper $ZOOKEEPER_HOST
--alter
--add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200'
--entity-type users
--entity-name
--entity-type clients
--entity-default
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh
--zookeeper $ZOOKEEPER_HOST
--alter
--add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200'
--entity-type users
--entity-name
--entity-type clients
--entity-default
--entity-type clients
--entity-default
=== Add default quota for user
[source,bash]
For VMs
$KAFKA_BIN/kafka-configs.sh
--zookeeper $ZOOKEEPER_HOST
--alter
--add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200'
--entity-type users
--entity-default
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh
--zookeeper $ZOOKEEPER_HOST
--alter
--add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200'
--entity-type users
--entity-default
=== Add default quota for client-id
[source,bash]
For VMs
$KAFKA_BIN/kafka-configs.sh
--zookeeper $ZOOKEEPER_HOST
--alter
--add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200'
--entity-type clients
--entity-default
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh
--zookeeper $ZOOKEEPER_HOST
--alter
--add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200'
--entity-type clients
--entity-default
=== Describe quota for user and client-id
[source,bash]
For VMs
$KAFKA_BIN/kafka-configs.sh
--zookeeper $ZOOKEEPER_HOST
--describe
--entity-type users
--entity-name
--entity-type clients
--entity-name
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh
--zookeeper $ZOOKEEPER_HOST
--describe
--entity-type users
--entity-name
--entity-type clients
--entity-name
--entity-type clients
--entity-name
=== Describe quota for a user
[source,bash]
For VMs
$KAFKA_BIN/kafka-configs.sh
--zookeeper $ZOOKEEPER_HOST
--describe
--entity-type users
--entity-name
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh
--zookeeper $ZOOKEEPER_HOST
--describe
--entity-type users
--entity-name
=== Describe quota for a client
[source,bash]
For VMs
$KAFKA_BIN/kafka-configs.sh
--zookeeper $ZOOKEEPER_HOST
--describe
--entity-type clients
--entity-name
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh
--zookeeper $ZOOKEEPER_HOST
--describe
--entity-type clients
--entity-name
== ACLs
=== Allow
[source,bash]
For VMs
$KAFKA_BIN/kafka-acls.sh
--authorizer-properties zookeeper.connect=$ZOOKEEPER_HOST
--add
--allow-principal User:
--allow-principal User:
--allow-host
--allow-host
--operation Read
--operation Write
--topic <topic_name>
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-acls.sh
--authorizer-properties zookeeper.connect=$ZOOKEEPER_HOST
--add
--allow-principal User:
--allow-principal User:
--allow-host
--allow-host
--operation Read
--operation Write
--topic <topic_name>
--allow-principal User:
--allow-host
--allow-host
--operation Read
--operation Write
--topic <topic_name>
=== Allow all read from topic but
[source,bash]
For VMs
$KAFKA_BIN/kafka-acls.sh
--authorizer-properties zookeeper.connect=$ZOOKEEPER_HOST
--add
--allow-principal User:*
--allow-host *
--deny-principal User:
--deny-host
--operation Read
--topic <topic_name>
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-acls.sh
--authorizer-properties zookeeper.connect=$ZOOKEEPER_HOST
--add
--allow-principal User:*
--allow-host *
--deny-principal User:
--deny-host
--operation Read
--topic <topic_name>
--deny-host
--operation Read
--topic <topic_name>
=== Allow
[source,bash]
For VMs
$KAFKA_BIN/kafka-acls.sh
--authorizer-properties zookeeper.connect=$ZOOKEEPER_HOST
--add
--allow-principal User:
--allow-host
--producer --topic *
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-acls.sh
--authorizer-properties zookeeper.connect=$ZOOKEEPER_HOST
--add
--allow-principal User:
--allow-host
--producer --topic *
--allow-host
--producer --topic *
=== Allow
[source,bash]
For VMs
$KAFKA_BIN/kafka-acls.sh
--authorizer-properties zookeeper.connect=$ZOOKEEPER_HOST
--add
--allow-principal User:
--allow-host
--consume --topic *
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-acls.sh
--authorizer-properties zookeeper.connect=$ZOOKEEPER_HOST
--add
--allow-principal User:
--allow-host
--consume --topic *
--allow-host
--consume --topic *
=== Remove ACL for
[source,bash]
For VMs
$KAFKA_BIN/kafka-acls.sh
--authorizer-properties zookeeper.connect=$ZOOKEEPER_HOST
--remove
--allow-principal User:
--allow-principal User:
--allow-host
--allow-host
--operation Read
--operation Write
--topic <topic_name>
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-acls.sh
--authorizer-properties zookeeper.connect=$ZOOKEEPER_HOST
--remove
--allow-principal User:
--allow-principal User:
--allow-host
--allow-host
--operation Read
--operation Write
--topic <topic_name>
--allow-principal User:
--allow-host
--allow-host
--operation Read
--operation Write
--topic <topic_name>
=== List ACLs on specific topic
[source,bash]
For VMs
$KAFKA_BIN/kafka-acls.sh
--authorizer-properties zookeeper.connect=$ZOOKEEPER_HOST
--list
--topic <topic_name>
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-acls.sh
--authorizer-properties zookeeper.connect=$ZOOKEEPER_HOST
--list
--topic <topic_name>
=== List ACLs on all topics
[source,bash]
For VMs
$KAFKA_BIN/kafka-acls.sh
--authorizer-properties zookeeper.connect=$ZOOKEEPER_HOST
--list
--topic *
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-acls.sh
--authorizer-properties zookeeper.connect=$ZOOKEEPER_HOST
--list
--topic *
== Mirror Maker
=== Mirror topic
[source,bash]
For VMs
$KAFKA_BIN/kafka-mirror-maker.sh
--consumer.config consumer.properties
--producer.config producer.properties
--whitelist <topic_name>
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-mirror-maker.sh
--consumer.config consumer.properties
--producer.config producer.properties
--whitelist <topic_name>
== Delegation Token
=== Create token
[source,bash]
For VMs
$KAFKA_BIN/kafka-delegation-tokens.sh
--bootstrap-server $BROKER_HOST
--create
--max-life-time-period -1
--command-config client.properties
--renewer-principal User:
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-delegation-tokens.sh
--bootstrap-server $BROKER_HOST
--create
--max-life-time-period -1
--command-config client.properties
--renewer-principal User:
=== Renew token
[source,bash]
For VMs
$KAFKA_BIN/kafka-delegation-tokens.sh
--bootstrap-server $BROKER_HOST
--renew
--renew-time-period -1
--command-config client.properties
--hmac ABCDEFGHIJK
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-delegation-tokens.sh
--bootstrap-server $BROKER_HOST
--renew
--renew-time-period -1
--command-config client.properties
--hmac ABCDEFGHIJK
=== Expire token
[source,bash]
For VMs
$KAFKA_BIN/kafka-delegation-tokens.sh
--bootstrap-server $BROKER_HOST
--expire
--expiry-time-period -1
--command-config client.properties
--hmac ABCDEFGHIJK
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-delegation-tokens.sh
--bootstrap-server $BROKER_HOST
--expire
--expiry-time-period -1
--command-config client.properties
--hmac ABCDEFGHIJK
=== Describe token
[source,bash]
For VMs
$KAFKA_BIN/kafka-delegation-tokens.sh
--bootstrap-server $BROKER_HOST
--describe
--command-config client.properties
--owner-principal User:
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-delegation-tokens.sh
--bootstrap-server $BROKER_HOST
--describe
--command-config client.properties
--owner-principal User:
== Performance Test
=== Producer
[source,bash]
For VMs
$KAFKA_BIN/kafka-producer-perf-test.sh
--topic teste
--num-records 50000000
--record-size 100
--throughput -1
--producer-props acks=all bootstrap.servers=$BROKER_HOST buffer.memory=67108864 batch.size=8196
For Kubernetes
kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-producer-perf-test.sh
--topic teste
--num-records 50000000
--record-size 100
--throughput -1
--producer-props acks=all bootstrap.servers=$BROKER_HOST buffer.memory=67108864 batch.size=8196
=== Consumer
[source,bash]
For VMs
$KAFKA_BIN/kafka-consumer-perf-test.sh
--group grupo
--print-metrics
--show-detailed-stats
--topic teste
--messages 600000
--broker-list $BROKER_HOST
--timeout 1000000