kafka-cheat-sheet icon indicating copy to clipboard operation
kafka-cheat-sheet copied to clipboard

Kafka Cheat Sheet

= Kafka Cheat Sheet :toc: :toc-placement!: :icons:

Here you have some useful commands for kafka.

Tested on Kafka 2.5

Goals

  • [x] Add most useful commands for kafka
  • [x] Add kafkacat commands
  • [ ] Add commands output
  • [x] Add commands for Kafka on Kubernetes

toc::[]

== Pre-req

First, set some kafka environment vars.

[source,bash]

For Kafka running on top of VMs/Bare Metal

KAFKA_BIN=/opt/kafka/bin ZOOKEEPER_HOST=zookeeper-host:2181 BROKER_HOST=broker-host:9092

For Kafka running on top of Kubernetes (Using strimzi)

KAFKA_NAMESPACE=kafka-demo ZOOKEEPER_HOST=localhost:2181 BROKER_HOST=localhost:9092 ZOOKEEPER_POD=$(kubectl -n $KAFKA_NAMESPACE get pods -l app.kubernetes.io/name=zookeeper -o=jsonpath='{.items[0].metadata.name}') KAFKA_BROKER_POD=$(kubectl -n $KAFKA_NAMESPACE get pods -l app.kubernetes.io/name=kafka -o=jsonpath='{.items[0].metadata.name}')

== Zookeeper Operations

You need to whitelist all the commands bellow.

.zookeeper.properties

4lw.commands.whitelist=stat,ruok,reqs,envi,dump,conf,cons,srvr,wchs,wchc,dirs,wchp,mntr,isro

  • If using Zookeeper Auth (SASL)

[source,bash]

Zookeeper Auth

export KAFKA_OPTS="-Djava.security.auth.login.config=/tmp/jaas.conf"

.jass.conf

Client { org.apache.zookeeper.server.auth.DigestLoginModule required username="test" password="test"; };

  • If using SSL/TLS on Zookeeper + SASL

[source,bash]

export KAFKA_OPTS="-Djava.security.auth.login.config=/tmp/jaas.conf -Dzookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNetty -Dzookeeper.client.secure=true -Dzookeeper.ssl.trustStore.location=/tmp/kafka.server.truststore -Dzookeeper.ssl.trustStore.password=mypass -Dzookeeper.ssl.trustStore.type=PKCS12"

NOTE: Remember to change your zookeeper port on the ZOOKEEPER_HOST if necessary

=== Get runtime conf

[source,bash]

For VMs

echo conf | curl telnet://$ZOOKEEPER_HOST

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bash -c "echo conf | curl telnet://localhost:2181"

=== Get runtime environments

[source,bash]

For VMs

echo envi | curl telnet://$ZOOKEEPER_HOST

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bash -c "echo envi | curl telnet://localhost:2181"

=== Health Check

[source,bash]

For VMs

echo stats | curl telnet://$ZOOKEEPER_HOST echo ruok | curl telnet://$ZOOKEEPER_HOST

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bash -c "echo stats | curl telnet://localhost:2181" kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bash -c "echo ruok | curl telnet://localhost:2181"

=== Connections

[source,bash]

For VMs

echo reqs | curl telnet://$ZOOKEEPER_HOST echo cons | curl telnet://$ZOOKEEPER_HOST

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bash -c "echo reqs | curl telnet://localhost:2181" kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bash -c "echo cons | curl telnet://localhost:2181"

=== Details of the server

[source,bash]

For VMs

echo srvr | curl telnet://$ZOOKEEPER_HOST

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bash -c "echo srvr | curl telnet://localhost:2181"

=== Brief info about watches

[source,bash]

For VMs

echo wchs | curl telnet://$ZOOKEEPER_HOST

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bash -c "echo wchs | curl telnet://localhost:2181"

=== Details about watches

[source,bash]

For VMs

echo wchc | curl telnet://$ZOOKEEPER_HOST

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bash -c "echo wchs | curl telnet://localhost:2181"

=== Snapshots info

[source,bash]

For VMs

echo dirs | curl telnet://$ZOOKEEPER_HOST

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bash -c "echo dirs | curl telnet://localhost:2181"

=== Monitoring vars

[source,bash]

For VMs

echo mntr | curl telnet://$ZOOKEEPER_HOST

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bash -c "echo mntr | curl telnet://localhost:2181"

=== Get read-only or read-write mode

[source,bash]

For VMs

echo isro | curl telnet://$ZOOKEEPER_HOST

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bash -c "echo isro | curl telnet://localhost:2181"

=== Get Process

[source,bash]

For VMs

jps | grep QuorumPeerMain

For kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $ZOOKEEPER_POD -- bash -c "ps aux | grep QuorumPeerMain"

== Broker Operations

=== List active brokers

[source,bash]

For VMs

$KAFKA_BIN/zookeeper-shell.sh $ZOOKEEPER_HOST ls /brokers/ids

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/zookeeper-shell.sh localhost:2181 ls /brokers/ids

[source,bash]

kafkacat -b $BROKER_HOST -L

=== List Broker Controller

[source,bash]

For VMs

$KAFKA_BIN/zookeeper-shell.sh $ZOOKEEPER_HOST get /controller

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/zookeeper-shell.sh localhost:2181 get /controller

=== List broker details

[source,bash]

For VMs

$KAFKA_BIN/zookeeper-shell.sh $ZOOKEEPER_HOST ls /brokers/ids/{id}

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/zookeeper-shell.sh localhost:2181 ls /brokers/ids/{id}

[source,bash]

kafkacat -b $BROKER_HOST -L

=== List topics

[source,bash]

For VMs

$KAFKA_BIN/zookeeper-shell.sh $ZOOKEEPER_HOST ls /brokers/topics

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/zookeeper-shell.sh localhost:2181 ls /brokers/topics

[source,bash]

kafkacat -b $BROKER_HOST -L -t

=== Change Broker Config

Change log cleaner threads.

[source,bash]

For VMs

$KAFKA_BIN/kafka-configs.sh
--bootstrap-server $BROKER_HOST
--entity-type brokers
--entity-name
--alter
--add-config log.cleaner.threads=2

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh
--bootstrap-server $BROKER_HOST
--entity-type brokers
--entity-name
--alter
--add-config log.cleaner.threads=2

=== Describe broker dynamic config

[source,bash]

For VMs

$KAFKA_BIN/kafka-configs.sh
--bootstrap-server $BROKER_HOST
--entity-type brokers
--entity-name
--describe

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh
--bootstrap-server $BROKER_HOST
--entity-type brokers
--entity-name
--describe

=== Delete broker config

[source,bash]

For VMs

$KAFKA_BIN/kafka-configs.sh
--bootstrap-server $BROKER_HOST
--entity-type brokers
--entity-name
--alter
--delete-config log.cleaner.threads

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh
--bootstrap-server $BROKER_HOST
--entity-type brokers
--entity-name
--alter
--delete-config log.cleaner.threads

=== Change cluster-wide dynamic config

[source,bash]

For VMs

$KAFKA_BIN/kafka-configs.sh
--bootstrap-server $BROKER_HOST
--entity-type brokers
--entity-default
--alter
--add-config log.cleaner.threads=2

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh
--bootstrap-server $BROKER_HOST
--entity-type brokers
--entity-default
--alter
--add-config log.cleaner.threads=2

=== Describe cluster-wide dynamic config

[source,bash]

For VMs

$KAFKA_BIN/kafka-configs.sh
--bootstrap-server $BROKER_HOST
--entity-type brokers
--entity-default
--describe

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh
--bootstrap-server $BROKER_HOST
--entity-type brokers
--entity-default
--describe

=== Disable hostname verification

[source,bash]

For VMs

$KAFKA_BIN/kafka-configs.sh
--bootstrap-server $BROKER_HOST
--entity-type brokers
--entity-name
--alter
--add-config "listener.name.internal.ssl.endpoint.identification.algorithm="

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh
--bootstrap-server $BROKER_HOST
--entity-type brokers
--entity-name
--alter
--add-config "listener.name.internal.ssl.endpoint.identification.algorithm="

== Topic Operations

=== List topics using kafka-topics.sh

[source,bash]

For VMs

$KAFKA_BIN/kafka-topics.sh
--list
--zookeeper $ZOOKEEPER_HOST

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-topics.sh
--list
--zookeeper $ZOOKEEPER_HOST

[source,bash]

For VMs

$KAFKA_BIN/kafka-topics.sh
--bootstrap-server $BROKER_HOST
--list

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-topics.sh
--list
--bootstrap-server $BROKER_HOST

=== Describe topic

[source,bash]

For VMs

$KAFKA_BIN/kafka-topics.sh
--zookeeper $ZOOKEEPER_HOST
--topic <topic_name>
--describe

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-topics.sh
--zookeeper $ZOOKEEPER_HOST
--topic <topic_name>
--describe

[source,bash]

kafkacat -b $BROKER_HOST -L -t <topic_name>

=== Describe topic configs

[source,bash]

For VMs

$KAFKA_BIN/kafka-configs.sh
--zookeeper $ZOOKEEPER_HOST
--entity-type topics
--entity-name <topic_name>
--describe

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh
--zookeeper $ZOOKEEPER_HOST
--entity-type topics
--entity-name <topic_name>
--describe

=== Delete topic config

[source,bash]

For VMs

$KAFKA_BIN/kafka-configs.sh
--zookeeper $ZOOKEEPER_HOST
--entity-type topics
--entity-name <topic_name>
--delete-config

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh
--zookeeper $ZOOKEEPER_HOST
--entity-type topics
--entity-name <topic_name>
--delete-config

=== Copy topic to another topic in the same cluster

[source,bash]

kafkacat -C -b $BROKER_HOST -t <topic_name> -e | kafkacat -P -b $BROKER_HOST -t

=== Copy topic to another topic in another cluster

[source,bash]

kafkacat -C -b $BROKER_HOST -t <topic_name> -e | kafkacat -P -b $BROKER_HOST2 -t

=== Move topic to another broker

==== Create json necessary

.topics-to-move.json [source,json]

{"topics": [{"topic": "topic1"}, {"topic": "topic2"}], "version":1 }

==== Generate plan to move to brokers

.generate plan to move to broker 5 and 6 [source,bash]

For VMs

$KAFKA_BIN/kafka-reassign-partitions.sh
--zookeeper $ZOOKEEPER_HOST
--topics-to-move-json-file topics-to-move.json
--broker-list "5,6"
--generate

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh
--zookeeper $ZOOKEEPER_HOST
--entity-type topics
--entity-name <topic_name>
--delete-config

NOTE: save the results from the command above to cluster-reassignment.json

==== Move to broker 5 and 6

.move to broker 5 and 6 [source,bash]

For VMs

$KAFKA_BIN/kafka-reassign-partitions.sh
--zookeeper $ZOOKEEPER_HOST
--reassignment-json-file cluster-reassignment.json
--execute

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-reassign-partitions.sh
--zookeeper $ZOOKEEPER_HOST
--reassignment-json-file cluster-reassignment.json
--execute

==== Verify status

.verify status [source,bash]

For VMs

$KAFKA_BIN/kafka-reassign-partitions.sh
--zookeeper $ZOOKEEPER_HOST
--reassignment-json-file cluster-reassignment.json
--verify

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-reassign-partitions.sh
--zookeeper $ZOOKEEPER_HOST
--reassignment-json-file cluster-reassignment.json
--verify

=== Create topic

[source,bash]

For VMs

$KAFKA_BIN/kafka-topics.sh
--create
--zookeeper $ZOOKEEPER_HOST
--replication-factor 1
--partitions 1
--topic <topic_name>

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-topics.sh
--create
--zookeeper $ZOOKEEPER_HOST
--replication-factor 1
--partitions 1
--topic <topic_name>

=== Create topic with config

[source,bash]

For VMs

$KAFKA_BIN/kafka-topics.sh
--bootstrap-server $BROKER_HOST
--create
--topic <topic_name>
--partitions 1
--replication-factor 1
--config max.message.bytes=64000
--config flush.messages=1

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-topics.sh
--bootstrap-server $BROKER_HOST
--create
--topic <topic_name>
--partitions 1
--replication-factor 1
--config max.message.bytes=64000
--config flush.messages=1

=== Increase replication factor of __consumer_offsets

==== Create replication plan

.reassignment.json [source,json]

{"version":1, "partitions":[ {"topic":"__consumer_offsets", "partition":0, "replicas":[106,101,102,105]}, {"topic":"__consumer_offsets", "partition":1, "replicas":[106,101,102,105]}, {"topic":"__consumer_offsets", "partition":2, "replicas":[106,101,102,105]}, {"topic":"__consumer_offsets", "partition":3, "replicas":[106,101,102,105]}, {"topic":"__consumer_offsets", "partition":4, "replicas":[106,101,102,105]}, {"topic":"__consumer_offsets", "partition":5, "replicas":[106,101,102,105]}, {"topic":"__consumer_offsets", "partition":6, "replicas":[106,101,102,105]}, {"topic":"__consumer_offsets", "partition":7, "replicas":[106,101,102,105]}, {"topic":"__consumer_offsets", "partition":8, "replicas":[106,101,102,105]}, {"topic":"__consumer_offsets", "partition":9, "replicas":[106,101,102,105]}, {"topic":"__consumer_offsets", "partition":10, "replicas":[101,102,103,105]}, {"topic":"__consumer_offsets", "partition":11, "replicas":[101,102,103,105]}, {"topic":"__consumer_offsets", "partition":12, "replicas":[101,102,103,105]}, {"topic":"__consumer_offsets", "partition":13, "replicas":[101,102,103,105]}, {"topic":"__consumer_offsets", "partition":14, "replicas":[101,102,103,105]}, {"topic":"__consumer_offsets", "partition":15, "replicas":[101,102,103,105]}, {"topic":"__consumer_offsets", "partition":16, "replicas":[101,102,103,105]}, {"topic":"__consumer_offsets", "partition":17, "replicas":[101,102,103,105]}, {"topic":"__consumer_offsets", "partition":18, "replicas":[101,102,103,105]}, {"topic":"__consumer_offsets", "partition":19, "replicas":[101,102,103,105]}, {"topic":"__consumer_offsets", "partition":20, "replicas":[102,103,104,105]}, {"topic":"__consumer_offsets", "partition":21, "replicas":[102,103,104,105]}, {"topic":"__consumer_offsets", "partition":22, "replicas":[102,103,104,105]}, {"topic":"__consumer_offsets", "partition":23, "replicas":[102,103,104,105]}, {"topic":"__consumer_offsets", "partition":24, "replicas":[102,103,104,105]}, {"topic":"__consumer_offsets", "partition":25, "replicas":[102,103,104,105]}, {"topic":"__consumer_offsets", "partition":26, "replicas":[102,103,104,105]}, {"topic":"__consumer_offsets", "partition":27, "replicas":[102,103,104,105]}, {"topic":"__consumer_offsets", "partition":28, "replicas":[102,103,104,105]}, {"topic":"__consumer_offsets", "partition":29, "replicas":[102,103,104,105]}, {"topic":"__consumer_offsets", "partition":30, "replicas":[103,104,106,105]}, {"topic":"__consumer_offsets", "partition":31, "replicas":[103,104,106,105]}, {"topic":"__consumer_offsets", "partition":32, "replicas":[103,104,106,105]}, {"topic":"__consumer_offsets", "partition":33, "replicas":[103,104,106,105]}, {"topic":"__consumer_offsets", "partition":34, "replicas":[103,104,106,105]}, {"topic":"__consumer_offsets", "partition":35, "replicas":[103,104,106,105]}, {"topic":"__consumer_offsets", "partition":36, "replicas":[103,104,106,105]}, {"topic":"__consumer_offsets", "partition":37, "replicas":[103,104,106,105]}, {"topic":"__consumer_offsets", "partition":38, "replicas":[103,104,106,105]}, {"topic":"__consumer_offsets", "partition":39, "replicas":[103,104,106,105]}, {"topic":"__consumer_offsets", "partition":40, "replicas":[104,106,101,105]}, {"topic":"__consumer_offsets", "partition":41, "replicas":[104,106,101,105]}, {"topic":"__consumer_offsets", "partition":42, "replicas":[104,106,101,105]}, {"topic":"__consumer_offsets", "partition":43, "replicas":[104,106,101,105]}, {"topic":"__consumer_offsets", "partition":44, "replicas":[104,106,101,105]}, {"topic":"__consumer_offsets", "partition":45, "replicas":[104,106,101,105]}, {"topic":"__consumer_offsets", "partition":46, "replicas":[104,106,101,105]}, {"topic":"__consumer_offsets", "partition":47, "replicas":[104,106,101,105]}, {"topic":"__consumer_offsets", "partition":48, "replicas":[104,106,101,105]}, {"topic":"__consumer_offsets", "partition":49, "replicas":[104,106,101,105]} ] }

==== Increase partition

[source,bash]

For VMs

$KAFKA_BIN/kafka-reassign-partitions.sh
--zookeeper $ZOOKEEPER_HOST
--reassignment-json-file reassignment.json
--execute

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-reassign-partitions.sh
--zookeeper $ZOOKEEPER_HOST
--reassignment-json-file reassignment.json
--execute

==== Verify reassignment

[source,bash]

For VMs

$KAFKA_BIN/kafka-reassign-partitions.sh
--zookeeper $ZOOKEEPER_HOST
--reassignment-json-file reassignment.json
--verify

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-reassign-partitions.sh
--zookeeper $ZOOKEEPER_HOST
--reassignment-json-file reassignment.json
--verify

=== Alter topic

==== Alter retention time

[source,bash]

For VMs

$KAFKA_BIN/kafka-topics.sh
--zookeeper $ZOOKEEPER_HOST
--alter
--topic <topic_name>
--config retention.ms=1000

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-topics.sh
--zookeeper $ZOOKEEPER_HOST
--alter
--topic <topic_name>
--config retention.ms=1000

==== Alter min.insync.replicas

[source,bash]

For VMs

$KAFKA_BIN/kafka-topics.sh
--zookeeper $ZOOKEEPER_HOST
--alter
--topic <topic_name>
--config min.insync.replicas=2

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-topics.sh
--zookeeper $ZOOKEEPER_HOST
--alter
--topic <topic_name>
--config min.insync.replicas=2

==== Alter max.message.bytes

[source,bash]

For VMs

$KAFKA_BIN/kafka-configs.sh
--zookeeper $ZOOKEEPER_HOST
--entity-type topics
--entity-name <topic_name>
--alter
--add-config max.message.bytes=128000

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh
--zookeeper $ZOOKEEPER_HOST
--entity-type topics
--entity-name <topic_name>
--alter
--add-config max.message.bytes=128000

==== Delete retention time

[source,bash]

For VMs

$KAFKA_BIN/kafka-topics.sh
--zookeeper $ZOOKEEPER_HOST
--alter
--topic <topic_name>
--delete-config retention.ms

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-topics.sh
--zookeeper $ZOOKEEPER_HOST
--alter
--topic <topic_name>
--delete-config retention.ms

[source,bash]

For VMs

$KAFKA_BIN/kafka-configs.sh
--zookeeper $ZOOKEEPER_HOST \ --entity-type topics
--entity-name <topic_name>
--alter
--delete-config retention.ms

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh
--zookeeper $ZOOKEEPER_HOST \ --entity-type topics
--entity-name <topic_name>
--alter
--delete-config retention.ms

=== List topics under-replicated

[source,bash]

For VMs

$KAFKA_BIN/kafka-topics.sh
--zookeeper $ZOOKEEPER_HOST
--describe
--under-replicated-partitions

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-topics.sh
--zookeeper $ZOOKEEPER_HOST
--describe
--under-replicated-partitions

=== Delete topic

[source,bash]

For VMs

$KAFKA_BIN/kafka-topics.sh
--delete
--zookeeper $ZOOKEEPER_HOST
--topic <topic_name>

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-topics.sh
--delete
--zookeeper $ZOOKEEPER_HOST
--topic <topic_name>

[source,bash]

For VMs

$KAFKA_BIN/kafka-topics.sh
--bootstrap-server $BROKER_HOST
--delete
--topic <topic_name>

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-topics.sh
--bootstrap-server $BROKER_HOST
--delete
--topic <topic_name>

=== Get earliest offset

[source,bash]

For VMs

$KAFKA_BIN/kafka-run-class.sh
kafka.tools.GetOffsetShell
--broker-list $BROKER_HOST
--topic <topic_name>
--time -2

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-run-class.sh
kafka.tools.GetOffsetShell
--broker-list $BROKER_HOST
--topic <topic_name>
--time -2

=== Get latest offset

[source,bash]

For VMs

$KAFKA_BIN/kafka-run-class.sh
kafka.tools.GetOffsetShell
--broker-list $BROKER_HOST
--topic <topic_name>
--time -1

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-run-class.sh
kafka.tools.GetOffsetShell
--broker-list $BROKER_HOST
--topic <topic_name>
--time -1

== Partition Operations

=== Increase partition number

[source,bash]

For VMs

$KAFKA_BIN/kafka-topics.sh
--alter
--topic <topic_name>
--partitions 8

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-topics.sh
--alter
--topic <topic_name>
--partitions 8

=== Increase replication factor

.topics.json [source,json]

{ "topics": [ { "topic": "test" } ], "version": 1 }

[source,bash]

For VMs

$KAFKA_BIN/kafka-reassign-partitions.sh
--zookeeper $ZOOKEEPER_HOST
--generate
--broker-list "401,402,601"
--topics-to-move-json-file topics.json

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-reassign-partitions.sh
--zookeeper $ZOOKEEPER_HOST
--generate
--broker-list "401,402,601"
--topics-to-move-json-file topics.json

.new-replication-factor.json [source,json]

{"version":1,"partitions":[{"topic":"topic1","partition":0,"replicas":[5,6,7]}]}

.execute new replication factor [source,bash]

For VMs

$KAFKA_BIN/kafka-reassign-partitions.sh
--zookeeper $ZOOKEEPER_HOST
--reassignment-json-file new-replication-factor.json
--execute

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-reassign-partitions.sh
--zookeeper $ZOOKEEPER_HOST
--reassignment-json-file new-replication-factor.json
--execute

.verify status of partition reassignment [source,bash]

For VMs

$KAFKA_BIN/kafka-reassign-partitions.sh
--zookeeper $ZOOKEEPER_HOST
--reassignment-json-file new-replication-factor.json
--verify

$KAFKA_BIN/kafka-topics.sh
--bootstrap-server $ZOOKEEPER_HOST
--topic <topic_name>
--describe

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-reassign-partitions.sh
--zookeeper $ZOOKEEPER_HOST
--reassignment-json-file new-replication-factor.json
--verify

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-topics.sh
--bootstrap-server $ZOOKEEPER_HOST
--topic <topic_name>
--describe

=== Reassign partitions

Create plan

.topics.json [source,json]

{ "topics": [ { "topic": "test" } ], "version": 1 }

[source,bash]

For VMs

$KAFKA_BIN/kafka-reassign-partitions.sh
--zookeeper $ZOOKEEPER_HOST
--generate
--broker-list "401,402,601"
--topics-to-move-json-file topics.json

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-reassign-partitions.sh
--zookeeper $ZOOKEEPER_HOST
--generate
--broker-list "401,402,601"
--topics-to-move-json-file topics.json

Save the result of the above command to a file named replicas.json

[source,bash]

For VMs

$KAFKA_BIN/kafka-reassign-partitions.sh
--zookeeper $ZOOKEEPER_HOST
--reassignment-json-file replicas.json
--execute

$KAFKA_BIN/kafka-reassign-partitions.sh
--zookeeper $ZOOKEEPER_HOST
--reassignment-json-file replicas.json
--verify

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-reassign-partitions.sh
--zookeeper $ZOOKEEPER_HOST
--reassignment-json-file replicas.json
--execute

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-reassign-partitions.sh
--zookeeper $ZOOKEEPER_HOST
--reassignment-json-file replicas.json
--verify

=== List unavailable partitions

[source,bash]

For VMs

$KAFKA_BIN/kafka-topics.sh
--zookeeper $ZOOKEEPER_HOST
--describe
--unavailable-partitions

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-topics.sh
--zookeeper $ZOOKEEPER_HOST
--describe
--unavailable-partitions

=== Force election on all partitions

[source,bash]

For VMs

$KAFKA_BIN/kafka-leader-election.sh
--election-type preferred
--bootstrap-server $BROKER_HOST
--all-topic-partitions

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-leader-election.sh
--election-type preferred
--bootstrap-server $BROKER_HOST
--all-topic-partitions

=== Force election on specific topic and partition

[source,bash]

For VMs

$KAFKA_BIN/kafka-leader-election.sh
--election-type preferred
--bootstrap-server $BROKER_HOST
--topic
--partition

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-leader-election.sh
--election-type preferred
--bootstrap-server $BROKER_HOST
--topic
--partition

== Consumer

=== List consumer groups

[source,bash]

For VMs

$KAFKA_BIN/kafka-consumer-groups.sh
--list
--bootstrap-server $BROKER_HOST

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-consumer-groups.sh
--list
--bootstrap-server $BROKER_HOST

=== Describe consumer groups

[source,bash]

For VMs

$KAFKA_BIN/kafka-consumer-groups.sh
--describe
--group <group_id>
--bootstrap-server $BROKER_HOST

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-consumer-groups.sh
--describe
--group <group_id>
--bootstrap-server $BROKER_HOST

=== Describe all consumer groups

[source,bash]

For VMs

$KAFKA_BIN/kafka-consumer-groups.sh
--describe
--bootstrap-server $BROKER_HOST
--all-groups

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-consumer-groups.sh
--describe
--bootstrap-server $BROKER_HOST
--all-groups

=== Delete consumer group

[source,bash]

For VMs

$KAFKA_BIN/kafka-consumer-groups.sh
--bootstrap-server $BROKER_HOST
--delete
--group
--group

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-consumer-groups.sh
--bootstrap-server $BROKER_HOST
--delete
--group
--group

=== Active member in a consumer group

[source,bash]

For VMs

$KAFKA_BIN/kafka-consumer-groups.sh
--bootstrap-server $BROKER_HOST
--describe
--group
--members

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-consumer-groups.sh
--bootstrap-server $BROKER_HOST
--describe
--group
--members

=== Partition Assigned to each member

[source,bash]

For VMs

$KAFKA_BIN/kafka-consumer-groups.sh
--bootstrap-server $BROKER_HOST
--describe
--group <group_id>
--members
--verbose

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-consumer-groups.sh
--bootstrap-server $BROKER_HOST
--describe
--group <group_id>
--members
--verbose

=== Consumer Group State

[source,bash]

For VMs

$KAFKA_BIN/kafka-consumer-groups.sh
--bootstrap-server $BROKER_HOST
--describe
--group
--state

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-consumer-groups.sh
--bootstrap-server $BROKER_HOST
--describe
--group
--state

=== Consuming message

[source,bash]

For VMs

$KAFKA_BIN/kafka-console-consumer.sh
--bootstrap-server $BROKER_HOST
--topic <topic_name>

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-console-consumer.sh
--bootstrap-server $BROKER_HOST
--topic <topic_name>

[source,bash]

kafkacat -C -b $BROKER_HOST -t <topic_name>

=== Consuming message and formatting output

[source,bash]

kafkacat -C -b $BROKER_HOST -t <topic_name> -q -f 'Topic %t using partition %p at offset %o has key = %k and value = %S'

=== Consuming message from the beginning

[source,bash]

For VMs

$KAFKA_BIN/kafka-console-consumer.sh
--bootstrap-server $BROKER_HOST
--topic <topic_name>
--from-beginning

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-console-consumer.sh
--bootstrap-server $BROKER_HOST
--topic <topic_name>
--from-beginning

=== Consuming message from the end

[source,bash]

For VMs

$KAFKA_BIN/kafka-console-consumer.sh
--bootstrap-server $BROKER_HOST
--topic <topic_name>

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-console-consumer.sh
--bootstrap-server $BROKER_HOST
--topic <topic_name>

=== Consuming message and show output in JSON

[source,bash]

kafkacat -b $BROKER_HOST -t <topic_name> -J

=== Consuming and showing message key

[source,bash]

For VMs

$KAFKA_BIN/kafka-console-consumer.sh
--bootstrap-server $BROKER_HOST
--topic <topic_name>
--property print.key=true
--property key.separator=,

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-console-consumer.sh
--bootstrap-server $BROKER_HOST
--topic <topic_name>
--property print.key=true
--property key.separator=,

=== Read one message

[source,bash]

For VMs

$KAFKA_BIN/kafka-console-consumer.sh
--bootstrap-server $BROKER_HOST
--topic <topic_name>
--max-messages 1

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-console-consumer.sh
--bootstrap-server $BROKER_HOST
--topic <topic_name>
--max-messages 1

=== Read the last 2 messages from topic and then exit

[source,bash]

kafkacat -C -b $BROKER_HOST -t <topic_name> -o -2 -e

=== Read the last 2 messages from partition 0

[source,bash]

kafkacat -C -b $BROKER_HOST -t <topic_name> -o -2 -e -p 0

=== Read from __consumer_offsets

[source,bash]

For VMs

$KAFKA_BIN/kafka-console-consumer.sh
--bootstrap-server $BROKER_HOST
--topic __consumer_offsets
--formatter 'kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter'
--max-messages 1

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-console-consumer.sh
--bootstrap-server $BROKER_HOST
--topic __consumer_offsets
--formatter 'kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter'
--max-messages 1

=== Describe __consumer_offsets

[source,bash]

For VMs

$KAFKA_BIN/kafka-run-class.sh kafka.admin.ConsumerGroupCommand
--bootstrap-server $BROKER_HOST
--group
--new-consumer
--describe

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand
--bootstrap-server $BROKER_HOST
--group
--new-consumer
--describe

=== Read from __transaction_state

[source,bash]

For VMs

$KAFKA_BIN/kafka-console-consumer.sh
--bootstrap-server $BROKER_HOST
--formatter "kafka.coordinator.transaction.TransactionLog$TransactionLogMessageFormatter"
--topic __transaction_state
--from-beginning

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-console-consumer.sh
--bootstrap-server $BROKER_HOST
--topic __transaction_state
--from-beginning
--formatter "kafka.coordinator.transaction.TransactionLog$TransactionLogMessageFormatter"

=== Consume using consumer group

[source,bash]

For VMs

$KAFKA_BIN/kafka-console-consumer.sh
--topic <topic_name>
--bootstrap-server $BROKER_HOST
--group

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-console-consumer.sh
--topic <topic_name>
--bootstrap-server $BROKER_HOST
--group

=== Topics to which group is subscribed

[source,bash]

For VMs

$KAFKA_BIN/kafka-consumer-groups.sh
--bootstrap-server $BROKER_HOST
--group <group_id>
--describe

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-consumer-groups.sh
--bootstrap-server $BROKER_HOST
--group <group_id>
--describe

=== Reset offset

==== Reset to the latest offset

[source,bash]

For VMs

$KAFKA_BIN/kafka-consumer-groups.sh
--bootstrap-server $BROKER_HOST
--reset-offsets
--group
--topic topic1
--to-latest

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-consumer-groups.sh
--bootstrap-server $BROKER_HOST
--reset-offsets
--group
--topic topic1
--to-latest

==== Reset offset for a consumer group in a topic

[source,bash]

For VMs

There are many other resetting options

--shift-by <positive_or_negative_integer> / --to-current / --to-latest / --to-offset <offset_integer>

--to-datetime <datetime_string> --by-duration <duration_string>

$KAFKA_BIN/kafka-consumer-groups.sh
--bootstrap-server $BROKER_HOST
--group <group_id>
--topic <topic_name>
--reset-offsets
--to-earliest
--execute

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-consumer-groups.sh
--bootstrap-server $BROKER_HOST
--group <group_id>
--topic <topic_name>
--reset-offsets
--to-earliest
--execute

==== Reset offset from all consumer groups

[source,bash]

For VMs

$KAFKA_BIN/kafka-consumer-groups.sh
--bootstrap-server $BROKER_HOST
--all-groups
--reset-offsets
--topic <topic_name>
--to-earliest

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-consumer-groups.sh
--bootstrap-server $BROKER_HOST
--all-groups
--reset-offsets
--topic <topic_name>
--to-earliest

==== Forward by 2 for example

[source,bash]

For VMs

$KAFKA_BIN/kafka-consumer-groups.sh
--bootstrap-server $BROKER_HOST
--group <groud_id>
--reset-offsets
--shift-by 2
--execute
--topic <topic_name>

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-consumer-groups.sh
--bootstrap-server $BROKER_HOST
--group <groud_id>
--reset-offsets
--shift-by 2
--execute
--topic <topic_name>

==== Backward by 2 for example

[source,bash]

For VMs

$KAFKA_BIN/kafka-consumer-groups.sh
--bootstrap-server $BROKER_HOST
--group <groud_id>
--reset-offsets
--shift-by -2
--execute
--topic <topic_name>

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-consumer-groups.sh
--bootstrap-server $BROKER_HOST
--group <groud_id>
--reset-offsets
--shift-by -2
--execute
--topic <topic_name>

=== Describe consumer group

[source,bash]

For VMs

$KAFKA_BIN/kafka-consumer-groups.sh
--bootstrap-server $BROKER_HOST
--describe
--group <group_id>

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-consumer-groups.sh
--bootstrap-server $BROKER_HOST
--describe
--group <group_id>

=== Check offset for consumer group

[source,bash]

For VMs

$KAFKA_BIN/kafka-consumer-offset-checker.sh
--zookeeper $ZOOKEEPER_HOST
--group <group_id>
--topic <topic_name>

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-consumer-offset-checker.sh
--zookeeper $ZOOKEEPER_HOST
--group <group_id>
--topic <topic_name>

== Producer

=== Send message using file

[source,bash]

For VMs

$KAFKA_BIN/kafka-console-producer.sh
--broker-list $BROKER_HOST
--topic <topic_name> < messages.txt

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-console-producer.sh
--broker-list $BROKER_HOST
--topic <topic_name> < messages.txt

[source,bash]

kafkacat -P -l -b $BROKER_HOST -t <topic_name> messages.txt

=== Send message using standard input

[source,bash]

For VMs

$KAFKA_BIN/kafka-console-producer.sh
--broker-list $BROKER_HOST
--topic <topic_name>

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-console-producer.sh
--broker-list $BROKER_HOST
--topic <topic_name>

[source,bash]

kafkacat -P -b $BROKER_HOST -t <topic_name>

=== Send message using snappy compression

[source,bash]

kafkacat -P -b $BROKER_HOST -t <topic_name> -z snappy

=== Send 200 messages to a topic

[source,bash]

seq 200 | kafkacat -P -b $BROKER_HOST -t <topic_name>

=== Send message using string

[source,bash]

For VMs

echo "My Message" | $KAFKA_BIN/kafka-console-producer.sh
--broker-list $BROKER_HOST
--topic <topic_name>

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-console-producer.sh
--broker-list $BROKER_HOST
--topic <topic_name>

[source,bash]

echo "My Message" | kafkacat -b $BROKER_HOST -t <topic_name>

=== Send message using headers

[source,bash]

echo "My Message" | kafkacat -b $BROKER_HOST -t <topic_name> echo "My Message" | kafkacat -b $BROKER_HOST -H "header1=value1" -H "header2=value2"

=== Send message using ack=all

[source,bash]

For VMs

$KAFKA_BIN/kafka-console-producer.sh
--broker-list $BROKER_HOST
--topic <topic_name>
--producer-property acks=all

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-console-producer.sh
--broker-list $BROKER_HOST
--topic <topic_name>
--producer-property acks=all

=== Send message with key

[source,bash]

For VMs

$KAFKA_BIN/kafka-console-producer.sh
--broker-list $BROKER_HOST
--topic <topic_name>
--property parse.key=true
--property key.separator=,

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-console-producer.sh
--broker-list $BROKER_HOST
--topic <topic_name>
--property parse.key=true
--property key.separator=,

NOTE: Your message should be: ,. For example: Gus,1000.

== Quotas

=== Add quota for user and client-id

[source,bash]

For VMs

$KAFKA_BIN/kafka-configs.sh
--zookeeper $ZOOKEEPER_HOST
--alter
--add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200'
--entity-type users
--entity-name
--entity-type clients
--entity-name

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh
--zookeeper $ZOOKEEPER_HOST
--alter
--add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200'
--entity-type users
--entity-name
--entity-type clients
--entity-name

=== Add quota for user

[source,bash]

For VMs

$KAFKA_BIN/kafka-configs.sh
--zookeeper $ZOOKEEPER_HOST
--alter
--add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200'
--entity-type users
--entity-name

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh
--zookeeper $ZOOKEEPER_HOST
--alter
--add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200'
--entity-type users
--entity-name

=== Add quota for client-id

[source,bash]

For VMs

$KAFKA_BIN/kafka-configs.sh
--zookeeper $ZOOKEEPER_HOST
--alter
--add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200'
--entity-type clients
--entity-name

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh
--zookeeper $ZOOKEEPER_HOST
--alter
--add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200'
--entity-type clients
--entity-name

=== Add default client-id quota for user

[source,bash]

For VMs

$KAFKA_BIN/kafka-configs.sh
--zookeeper $ZOOKEEPER_HOST
--alter
--add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200'
--entity-type users
--entity-name
--entity-type clients
--entity-default

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh
--zookeeper $ZOOKEEPER_HOST
--alter
--add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200'
--entity-type users
--entity-name
--entity-type clients
--entity-default

=== Add default quota for user

[source,bash]

For VMs

$KAFKA_BIN/kafka-configs.sh
--zookeeper $ZOOKEEPER_HOST
--alter
--add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200'
--entity-type users
--entity-default

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh
--zookeeper $ZOOKEEPER_HOST
--alter
--add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200'
--entity-type users
--entity-default

=== Add default quota for client-id

[source,bash]

For VMs

$KAFKA_BIN/kafka-configs.sh
--zookeeper $ZOOKEEPER_HOST
--alter
--add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200'
--entity-type clients
--entity-default

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh
--zookeeper $ZOOKEEPER_HOST
--alter
--add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200'
--entity-type clients
--entity-default

=== Describe quota for user and client-id

[source,bash]

For VMs

$KAFKA_BIN/kafka-configs.sh
--zookeeper $ZOOKEEPER_HOST
--describe
--entity-type users
--entity-name
--entity-type clients
--entity-name

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh
--zookeeper $ZOOKEEPER_HOST
--describe
--entity-type users
--entity-name
--entity-type clients
--entity-name

=== Describe quota for a user

[source,bash]

For VMs

$KAFKA_BIN/kafka-configs.sh
--zookeeper $ZOOKEEPER_HOST
--describe
--entity-type users
--entity-name

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh
--zookeeper $ZOOKEEPER_HOST
--describe
--entity-type users
--entity-name

=== Describe quota for a client

[source,bash]

For VMs

$KAFKA_BIN/kafka-configs.sh
--zookeeper $ZOOKEEPER_HOST
--describe
--entity-type clients
--entity-name

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-configs.sh
--zookeeper $ZOOKEEPER_HOST
--describe
--entity-type clients
--entity-name

== ACLs

=== Allow and to read and write

[source,bash]

For VMs

$KAFKA_BIN/kafka-acls.sh
--authorizer-properties zookeeper.connect=$ZOOKEEPER_HOST
--add
--allow-principal User:
--allow-principal User:
--allow-host
--allow-host
--operation Read
--operation Write
--topic <topic_name>

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-acls.sh
--authorizer-properties zookeeper.connect=$ZOOKEEPER_HOST
--add
--allow-principal User:
--allow-principal User:
--allow-host
--allow-host
--operation Read
--operation Write
--topic <topic_name>

=== Allow all read from topic but

[source,bash]

For VMs

$KAFKA_BIN/kafka-acls.sh
--authorizer-properties zookeeper.connect=$ZOOKEEPER_HOST
--add
--allow-principal User:*
--allow-host *
--deny-principal User:
--deny-host
--operation Read
--topic <topic_name>

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-acls.sh
--authorizer-properties zookeeper.connect=$ZOOKEEPER_HOST
--add
--allow-principal User:*
--allow-host *
--deny-principal User:
--deny-host
--operation Read
--topic <topic_name>

=== Allow to produce on all topics

[source,bash]

For VMs

$KAFKA_BIN/kafka-acls.sh
--authorizer-properties zookeeper.connect=$ZOOKEEPER_HOST
--add
--allow-principal User:
--allow-host
--producer --topic *

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-acls.sh
--authorizer-properties zookeeper.connect=$ZOOKEEPER_HOST
--add
--allow-principal User:
--allow-host
--producer --topic *

=== Allow to consume on all topics

[source,bash]

For VMs

$KAFKA_BIN/kafka-acls.sh
--authorizer-properties zookeeper.connect=$ZOOKEEPER_HOST
--add
--allow-principal User:
--allow-host
--consume --topic *

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-acls.sh
--authorizer-properties zookeeper.connect=$ZOOKEEPER_HOST
--add
--allow-principal User:
--allow-host
--consume --topic *

=== Remove ACL for and to read and write

[source,bash]

For VMs

$KAFKA_BIN/kafka-acls.sh
--authorizer-properties zookeeper.connect=$ZOOKEEPER_HOST
--remove
--allow-principal User:
--allow-principal User:
--allow-host
--allow-host
--operation Read
--operation Write
--topic <topic_name>

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-acls.sh
--authorizer-properties zookeeper.connect=$ZOOKEEPER_HOST
--remove
--allow-principal User:
--allow-principal User:
--allow-host
--allow-host
--operation Read
--operation Write
--topic <topic_name>

=== List ACLs on specific topic

[source,bash]

For VMs

$KAFKA_BIN/kafka-acls.sh
--authorizer-properties zookeeper.connect=$ZOOKEEPER_HOST
--list
--topic <topic_name>

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-acls.sh
--authorizer-properties zookeeper.connect=$ZOOKEEPER_HOST
--list
--topic <topic_name>

=== List ACLs on all topics

[source,bash]

For VMs

$KAFKA_BIN/kafka-acls.sh
--authorizer-properties zookeeper.connect=$ZOOKEEPER_HOST
--list
--topic *

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-acls.sh
--authorizer-properties zookeeper.connect=$ZOOKEEPER_HOST
--list
--topic *

== Mirror Maker

=== Mirror topic

[source,bash]

For VMs

$KAFKA_BIN/kafka-mirror-maker.sh
--consumer.config consumer.properties
--producer.config producer.properties
--whitelist <topic_name>

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-mirror-maker.sh
--consumer.config consumer.properties
--producer.config producer.properties
--whitelist <topic_name>

== Delegation Token

=== Create token

[source,bash]

For VMs

$KAFKA_BIN/kafka-delegation-tokens.sh
--bootstrap-server $BROKER_HOST
--create
--max-life-time-period -1
--command-config client.properties
--renewer-principal User:

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-delegation-tokens.sh
--bootstrap-server $BROKER_HOST
--create
--max-life-time-period -1
--command-config client.properties
--renewer-principal User:

=== Renew token

[source,bash]

For VMs

$KAFKA_BIN/kafka-delegation-tokens.sh
--bootstrap-server $BROKER_HOST
--renew
--renew-time-period -1
--command-config client.properties
--hmac ABCDEFGHIJK

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-delegation-tokens.sh
--bootstrap-server $BROKER_HOST
--renew
--renew-time-period -1
--command-config client.properties
--hmac ABCDEFGHIJK

=== Expire token

[source,bash]

For VMs

$KAFKA_BIN/kafka-delegation-tokens.sh
--bootstrap-server $BROKER_HOST
--expire
--expiry-time-period -1
--command-config client.properties
--hmac ABCDEFGHIJK

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-delegation-tokens.sh
--bootstrap-server $BROKER_HOST
--expire
--expiry-time-period -1
--command-config client.properties
--hmac ABCDEFGHIJK

=== Describe token

[source,bash]

For VMs

$KAFKA_BIN/kafka-delegation-tokens.sh
--bootstrap-server $BROKER_HOST
--describe
--command-config client.properties
--owner-principal User:

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-delegation-tokens.sh
--bootstrap-server $BROKER_HOST
--describe
--command-config client.properties
--owner-principal User:

== Performance Test

=== Producer

[source,bash]

For VMs

$KAFKA_BIN/kafka-producer-perf-test.sh
--topic teste
--num-records 50000000
--record-size 100
--throughput -1
--producer-props acks=all bootstrap.servers=$BROKER_HOST buffer.memory=67108864 batch.size=8196

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-producer-perf-test.sh
--topic teste
--num-records 50000000
--record-size 100
--throughput -1
--producer-props acks=all bootstrap.servers=$BROKER_HOST buffer.memory=67108864 batch.size=8196

=== Consumer

[source,bash]

For VMs

$KAFKA_BIN/kafka-consumer-perf-test.sh
--group grupo
--print-metrics
--show-detailed-stats
--topic teste
--messages 600000
--broker-list $BROKER_HOST
--timeout 1000000

For Kubernetes

kubectl -n $KAFKA_NAMESPACE exec -it $KAFKA_BROKER_POD -c kafka -- bin/kafka-consumer-perf-test.sh
--group grupo
--print-metrics
--show-detailed-stats
--topic teste
--messages 600000
--broker-list $BROKER_HOST
--timeout 1000000