springboot-kafka-connect-debezium-ksqldb
springboot-kafka-connect-debezium-ksqldb copied to clipboard
Experiment with Kafka, Debezium, and ksqlDB. research-service: Performs MySQL record manipulation. Source Connectors: Monitor MySQL changes, push messages to Kafka. Sink Connectors and kafka-research-...
springboot-kafka-connect-debezium-ksqldb
The goal of this project is to play with Kafka, Debezium and ksqlDB. For this, we have: research-service that inserts/updates/deletes records in MySQL; Source Connectors that monitor change of records in MySQL and push messages related to those changes to Kafka; Sink Connectors and kafka-research-consumer that listen messages from Kafka and insert/update documents in Elasticsearch; finally, ksqlDB-Server that listens some topics in Kafka, does some joins and pushes new messages to new topics in Kafka.
Project Diagram

Applications
-
research-service
Monolithic
Spring Bootapplication that exposes a REST API to manageInstitutes,Articles,ResearchersandReviews. The data is saved inMySQL.
-
kafka-research-consumer
Spring Bootapplication that listens messages from the topicreviews_researchers_institutes_articles(that is one ofksqlDBoutputs) and save the payload of those messages (i.e, reviews with detailed information) inElasticsearch.
Prerequisites
Start Environment
-
Open a terminal and, inside
springboot-kafka-connect-debezium-ksqldbroot folder, run the following commanddocker-compose up -dNote: During the first run, an image for
mysqlandkafka-connectwill be built, whose names arespringboot-kafka-connect-debezium-ksqldb_mysqlandspringboot-kafka-connect-debezium-ksqldb_kafka-connect, respectively. To rebuild those images rundocker-compose build -
Wait for all Docker containers to be up and running. To check it, run
docker-compose ps
Create Kafka Topics
In order to have topics in Kafka with more than 1 partition, we must create them manually and not wait for the connectors to create for us. So, for it
-
In a terminal, make sure you are in
springboot-kafka-connect-debezium-ksqldbroot folder -
Run the script below
./create-kafka-topics.shIt will create the topics
mysql.researchdb.institutes,mysql.researchdb.researchers,mysql.researchdb.articlesandmysql.researchdb.reviewswith5partitions.
Create connectors (3/4)
-
In a terminal, make sure you are in
springboot-kafka-connect-debezium-ksqldbroot folder -
Run the following
curlcommands to create oneDebeziumand twoElasticsearch-Sinkconnectors inkafka-connectcurl -i -X POST localhost:8083/connectors -H 'Content-Type: application/json' -d @connectors/debezium-mysql-source-researchdb.json curl -i -X POST localhost:8083/connectors -H 'Content-Type: application/json' -d @connectors/elasticsearch-sink-institutes.json curl -i -X POST localhost:8083/connectors -H 'Content-Type: application/json' -d @connectors/elasticsearch-sink-articles.json -
You can check the state of the connectors and their tasks on
Kafka Connect UI(http://localhost:8086) or callingkafka-connectendpointcurl localhost:8083/connectors/debezium-mysql-source-researchdb/status curl localhost:8083/connectors/elasticsearch-sink-institutes/status curl localhost:8083/connectors/elasticsearch-sink-articles/status -
The state of the connectors and their tasks must be
RUNNING. If there is any problem, you can checkkafka-connectcontainer logs.docker logs kafka-connect
Run research-service
-
Open a new terminal and navigate to
springboot-kafka-connect-debezium-ksqldbroot folder -
Run the command below to start the application
./mvnw clean spring-boot:run --projects research-service -Dspring-boot.run.jvmArguments="-Dserver.port=9080"It will create some articles, institutes and researchers. If you don't want it, just set to
falsethe propertiesload-samples.articles.enabled,load-samples.institutes.enabledandload-samples.researchers.enabledinapplication.yml. -
The Swagger link is http://localhost:9080/swagger-ui/index.html
-
Important: create at least one
reviewso thatmysql.researchdb.reviews-keyandmysql.researchdb.reviews-valueare created inSchema Registry. Below there is a sample request to create a review.curl -i -X POST localhost:9080/api/reviews \ -H "Content-Type: application/json" \ -d "{ \"researcherId\": 1, \"articleId\": 1, \"comment\": \"Ln 56: replace the 'a' by 'an'\"}"
Run ksqlDB-cli
-
Open a new terminal and, inside
springboot-kafka-connect-debezium-ksqldbroot folder, run thedockercommand below to startksqlDB-clidocker run -it --rm --name ksqldb-cli \ --network springboot-kafka-connect-debezium-ksqldb_default \ -v $PWD/docker/ksql/researchers-institutes.ksql:/tmp/researchers-institutes.ksql \ -v $PWD/docker/ksql/reviews-researchers-institutes-articles.ksql:/tmp/reviews-researchers-institutes-articles.ksql \ confluentinc/cp-ksqldb-cli:7.0.1 http://ksqldb-server:8088 -
On
ksqlDB-clicommand line, run the following commands-
Set
auto.offset.resetvalueSET 'auto.offset.reset' = 'earliest'; -
Run the following script. It will create
researchers_institutestopicRUN SCRIPT '/tmp/researchers-institutes.ksql'; -
Check whether the topic was created
DESCRIBE "researchers_institutes"; SELECT * FROM "researchers_institutes" EMIT CHANGES LIMIT 5; -
Run the script below. It will create
reviews_researchers_institutes_articlestopicRUN SCRIPT '/tmp/reviews-researchers-institutes-articles.ksql'; -
Check whether the topic was created
DESCRIBE "reviews_researchers_institutes_articles"; SELECT * FROM "reviews_researchers_institutes_articles" EMIT CHANGES LIMIT 1;
-
Create connectors (4/4)
-
In a terminal, make sure you are in
springboot-kafka-connect-debezium-ksqldbroot folder -
Run the
curlcommand below to createelasticsearch-sink-researchersconnector inkafka-connectcurl -i -X POST localhost:8083/connectors -H 'Content-Type: application/json' -d @connectors/elasticsearch-sink-researchers.json -
You can check the state of the connector and its task on
Kafka Connect UI(http://localhost:8086) or callingkafka-connectendpointcurl localhost:8083/connectors/elasticsearch-sink-researchers/status
Run kafka-research-consumer
-
Open a new terminal and navigate to
springboot-kafka-connect-debezium-ksqldbroot folder -
Run the command below to start the application
./mvnw clean spring-boot:run --projects kafka-research-consumer -Dspring-boot.run.jvmArguments="-Dserver.port=9081"The command below generates the Java class
ReviewMessagefrom Avro file present insrc/main/resources/avro./mvnw generate-sources --projects kafka-research-consumer -
This service runs on port
9081. Thehealthendpoint is http://localhost:9081/actuator/health -
[Optional] We can start another
kafka-research-consumerinstance by opening another terminal and running./mvnw clean spring-boot:run --projects kafka-research-consumer -Dspring-boot.run.jvmArguments="-Dserver.port=9082"
Testing
-
Go to the terminal where
ksql-cliis running -
On
ksql-clicommand line, run the following querySELECT * FROM "reviews_researchers_institutes_articles" EMIT CHANGES; -
In another terminal, call the
research-servicesimulation endpointcurl -X POST localhost:9080/api/simulation/reviews \ -H "Content-Type: application/json" \ -d "{\"total\": 100, \"sleep\": 100}" -
The GIF below shows it.
research-serviceis running in the upper left terminal;kafka-research-consumeris running in the upper right terminal; the middle terminal is used to submit the POST request toresearch-service. The lower terminal is whereksql-cliis running.
-
You can also query
Elasticsearchcurl "localhost:9200/reviews/_search?pretty"
Useful Links/Commands
-
Kafka Topics UI
Kafka Topics UIcan be accessed at http://localhost:8085 -
Kafka Connect UI
Kafka Connect UIcan be accessed at http://localhost:8086 -
Schema Registry UI
Schema Registry UIcan be accessed at http://localhost:8001 -
Schema Registry
You can use
curlto check the subjects inSchema Registry- Get the list of subjects
curl localhost:8081/subjects - Get the latest version of the subject
mysql.researchdb.researchers-valuecurl localhost:8081/subjects/mysql.researchdb.researchers-value/versions/latest
- Get the list of subjects
-
Kafka Manager
Kafka Managercan be accessed at http://localhost:9000Configuration
- First, you must create a new cluster. Click on
Cluster(dropdown on the header) and then onAdd Cluster - Type the name of your cluster in
Cluster Namefield, for example:MyCluster - Type
zookeeper:2181inCluster Zookeeper Hostsfield - Enable checkbox
Poll consumer information (Not recommended for large # of consumers if ZK is used for offsets tracking on older Kafka versions) - Click on
Savebutton at the bottom of the page.
- First, you must create a new cluster. Click on
-
Elasticsearch
Elasticsearchcan be accessed at http://localhost:9200- Get all indices
curl "localhost:9200/_cat/indices?v" - Search for documents
curl "localhost:9200/articles/_search?pretty" curl "localhost:9200/institutes/_search?pretty" curl "localhost:9200/researchers/_search?pretty" curl "localhost:9200/reviews/_search?pretty"
- Get all indices
-
MySQL
docker exec -it -e MYSQL_PWD=secret mysql mysql -uroot --database researchdb SELECT a.id AS review_id, c.id AS article_id, c.title AS article_title, b.id AS reviewer_id, b.first_name, b.last_name, b.institute_id, a.comment \ FROM reviews a, researchers b, articles c \ WHERE a.researcher_id = b.id and a.article_id = c.id;Type
exitto leaveMySQLterminal
Shutdown
- Go to the terminals where
research-serviceandkafka-research-consumerare running and pressCtrl+Cto stop them - Go to the terminal where
ksql-cliis running and pressCtrl+Cto stop theSELECT; then, typeexit - To stop and remove docker-compose containers, network and volumes, go to a terminal and, inside
springboot-kafka-connect-debezium-ksqldbroot folder, run the command belowdocker-compose down -v
Cleanup
To remove the Docker images created by this project, go to a terminal and, inside springboot-kafka-connect-debezium-ksqldb root folder, run the following script
./remove-docker-images.sh
TODO
- Create ES indices dynamically and add an
aliasfor them.
References
- https://docs.confluent.io/platform/current/ksqldb/index.html