knative-kafka icon indicating copy to clipboard operation
knative-kafka copied to clipboard

= Knative + Kafka

== Prerequisites

Assumes you have used try.openshift.com to create an OCP 4.2 cluster. Here is a quick video that illustrates that process.

https://youtu.be/sK2SWH_m01Y

CLI tools used:

  • kubectl
  • oc
  • jq
  • kafkacat
  • siege
  • watch

== Installation

Using the OCP 4.x Administration Console - find OperatorHub and install

. Knative Serving . Knative Eventing . Knative Kafka . Strimzi

image::images/operatorhub_ui.png[OperatorHub inside of OpenShift Console]

image::images/installed_operators.png[Installed Operators]

You can check on your installed operators and their versions:


kubectl get csv NAME DISPLAY VERSION REPLACES PHASE knative-eventing-operator.v0.7.1 Knative Eventing Operator 0.7.1 knative-eventing-operator.v0.6.0 Succeeded knative-kafka-operator.v0.7.1 Knative Apache Kafka Operator 0.7.1 knative-kafka-operator.v0.6.0 Succeeded knative-serving-operator.v0.7.1 Knative Serving Operator 0.7.1 knative-serving-operator.v0.6.0 Succeeded strimzi-cluster-operator.v0.13.0 Strimzi Apache Kafka Operator 0.13.0 strimzi-cluster-operator.v0.12.2 Succeeded

NOTE: I have also used the following versions - OpenShift Serverless pulls in ElasticSearch, Jaeger, Kiali

kubectl get csv NAME DISPLAY VERSION REPLACES PHASE elasticsearch-operator.4.3.1-202002032140 Elasticsearch Operator 4.3.1-202002032140 Succeeded jaeger-operator.v1.13.1 Jaeger Operator 1.13.1 Succeeded kiali-operator.v1.0.9 Kiali Operator 1.0.9 kiali-operator.v1.0.8 Succeeded knative-eventing-operator.v0.12.0 Knative Eventing Operator 0.12.0 knative-eventing-operator.v0.11.0 Succeeded knative-kafka-operator.v0.12.1 Knative Apache Kafka Operator 0.12.1 knative-kafka-operator.v0.11.2 Succeeded serverless-operator.v1.4.1 OpenShift Serverless Operator 1.4.1 serverless-operator.v1.4.0 Succeeded servicemeshoperator.v1.0.7 Red Hat OpenShift Service Mesh 1.0.7 servicemeshoperator.v1.0.6 Succeeded strimzi-cluster-operator.v0.15.0 Strimzi 0.15.0 strimzi-cluster-operator.v0.14.0 Succeeded

== Namespace/Project Setup [source,bash]

kubectl create namespace kafka

make it "sticky"

kubectl config set-context --current --namespace=kakfa

check that it is set

kubectl config current-context

or use "oc" to see what the "sticky" namespace is

oc project

== Create kafka cluster [source,bash]

cat <<EOF | kubectl apply -f - apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: name: my-cluster spec: kafka: config: offsets.topic.replication.factor: 3 transaction.state.log.replication.factor: 3 transaction.state.log.min.isr: 2 default.replication.factor: 3 min.insync.replicas: 2 inter.broker.protocol.version: '3.1' storage: type: ephemeral listeners: - name: plain port: 9092 type: internal tls: false - name: tls port: 9093 type: internal tls: true version: 3.1.0 replicas: 3 entityOperator: topicOperator: {} userOperator: {} zookeeper: storage: type: ephemeral replicas: 3 EOF

== Configure the Knative Eventing Kafka

Note: this only needs to be done one time [source,bash]

apiVersion: operator.serverless.openshift.io/v1alpha1 kind: KnativeKafka metadata: finalizers: - knative-kafka-openshift name: knative-kafka namespace: knative-eventing spec: broker: defaultConfig: authSecretName: '' bootstrapServers: 'my-cluster-kafka-bootstrap.kafka:9092' numPartitions: 10 replicationFactor: 3 enabled: true channel: authSecretName: '' authSecretNamespace: '' bootstrapServers: 'my-cluster-kafka-bootstrap.kafka:9092' enabled: false high-availability: replicas: 1 sink: enabled: false source: enabled: true

Note: the namespace of "kafka" likely

Verify the KnativeEventingKafka took affect

[source,bash]

kubectl get crds | grep kafkasource kafkasources.sources.eventing.knative.dev 2019-09-21T14:23:14Z

and

[source,bash]

kubectl get pods -n knative-eventing

NAME READY STATUS RESTARTS AGE broker-controller-66f988fb6c-6wk4t 1/1 Running 0 20h eventing-controller-5c955d4694-btwx8 1/1 Running 0 20h eventing-webhook-7f7bcb8447-27p9s 1/1 Running 0 20h imc-controller-6ddf4477fd-bjjhh 1/1 Running 0 20h imc-dispatcher-7676c44559-wzxg4 1/1 Running 0 20h kafka-ch-controller-5497f498dc-vm8x7 1/1 Running 0 4h19m kafka-controller-manager-544887898b-j654v 1/1 Running 0 4h20m kafka-webhook-65d8bb899c-6nsmq 1/1 Running 0 4h19m

== Create kafka topic

[source,bash]

cat <<EOF | kubectl apply -f - apiVersion: kafka.strimzi.io/v1alpha1 kind: KafkaTopic metadata: name: my-topic labels: strimzi.io/cluster: my-cluster spec: partitions: 100 replicas: 1 EOF

Test to see if the topic was created correctly

[source,bash]

oc exec -n kafka -it my-cluster-zookeeper-0 -- /bin/bash

bin/kafka-topics.sh --zookeeper localhost:12181 --list

bin/kafka-topics.sh --zookeeper localhost:12181 --describe --topic my-topic

OR

[source,bash]

kubectl exec -n kafka -it my-cluster-zookeeper-0 -- bin/kafka-topics.sh --zookeeper localhost:12181 --describe --topic my-topic

OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N Topic:my-topic PartitionCount:100 ReplicationFactor:1 Configs:message.format.version=2.3-IV1 Topic: my-topic Partition: 0 Leader: 2 Replicas: 2 Isr: 2 Topic: my-topic Partition: 1 Leader: 0 Replicas: 0 Isr: 0 Topic: my-topic Partition: 2 Leader: 1 Replicas: 1 Isr: 1 Topic: my-topic Partition: 3 Leader: 2 Replicas: 2 Isr: 2 Topic: my-topic Partition: 4 Leader: 0 Replicas: 0 Isr: 0 Topic: my-topic Partition: 5 Leader: 1 Replicas: 1 Isr: 1 Topic: my-topic Partition: 6 Leader: 2 Replicas: 2 Isr: 2 . . .

== Deploy a Knative Service

This is your "sink" that receives events

[source,bash]

cat <<EOF | kubectl apply -f - apiVersion: serving.knative.dev/v1 kind: Service metadata: name: myknativesink spec: template: metadata: annotations: autoscaling.knative.dev/target: "1" autoscaling.knative.dev/window: 16s spec: containers: - image: docker.io/burrsutter/myknativesink:1.0.1 resources: requests: memory: "50Mi" cpu: "100m" limits: memory: "70Mi" cpu: "100m"
livenessProbe: httpGet: path: /healthz readinessProbe: httpGet: path: /healthz
EOF

If your pod is stuck in PENDING, check your events

[source,bash]

kubectl get events --sort-by=.metadata.creationTimestamp

You likely need to add another worker node (OpenShift Console - Compute - MachineSets)

image::images/machinesets.png[Machinesets]

== Create the KafkaSource that connects my-topic to ksvc [source,bash]

cat <<EOF | kubectl apply -f - apiVersion: sources.knative.dev/v1beta1 kind: KafkaSource metadata: name: mykafka-source spec: consumerGroup: knative-group bootstrapServers:

  • my-cluster-kafka-bootstrap.kafka:9092 topics:
  • my-topic sink: ref: apiVersion: serving.knative.dev/v1 kind: Service name: myknativesink EOF

You can monitor the logs of kafkasource-mykafka-source to see if it has connectivity issues


stern kafkasource-mykafka-source

== Test

image::images/hello_world_1.png[Waiting]

image::images/hello_world_2.png[Sink pod is up]

image::images/goodbye_world.png[one more message]

== Scaling beyond 1 Pod

Kafka Spammer is a simple little application that drives in N messages as fast as it can.

Deploy


kubectl -n kafka run kafka-spammer
--image=quay.io/rhdevelopers/kafkaspammer:1.0.2

Exec into the Spammer

KAFKA_SPAMMER_POD=$(kubectl -n kafka get pod -l "run=kafka-spammer"
-o jsonpath='{.items[0].metadata.name}') kubectl -n kafka exec -it $KAFKA_SPAMMER_POD -- /bin/sh


curl localhost:8080/1

Watch the Developer Topology view

image::images/developer_topology.png[Developer View]

image::images/developer_topology_during_auto_scale.png[Developer View]

image::images/iterm_during_100.png[Terminal View]

== Clean up

[source,bash]

kubectl delete route kafka-producer kubectl delete service kafka-producer kubectl delete deployment kafka-producer kubectl delete kafkasource mykafka-source kubectl delete ksvc myknativesink kubectl delete KafkaTopic my-topic kubectl delete kafka my-cluster