springboot-kafka-connect-jdbc-streams
springboot-kafka-connect-jdbc-streams copied to clipboard
Project goal: Explore Kafka, Kafka Connect, and Kafka Streams. Components: store-api: Inserts/updates MySQL records. Source Connectors: Monitor MySQL changes, push messages to Kafka. Sink Connectors:...
springboot-kafka-connect-jdbc-streams
The main goal of this project is to play with Kafka
, Kafka Connect
and Kafka Streams
. For this, we have: store-api
that inserts/updates records in MySQL
; Source Connectors
that monitor inserted/updated records in MySQL
and push messages related to those changes to Kafka
; Sink Connectors
that listen messages from Kafka
and insert/update documents in Elasticsearch
; finally, store-streams
that listens messages from Kafka
, treats them using Kafka Streams
and push new messages back to Kafka
.
Project Diagram
Applications
-
store-api
Monolithic
Spring Boot
application that exposes a REST API to manageCustomers
,Products
andOrders
. The data is stored inMySQL
. -
store-streams
Spring Boot
application that connects toKafka
and usesKafka Streams API
to transform some "input" topics into a new "output" topic inKafka
.
Prerequisites
(De)Serialization formats
In order to run this project, you can use JSON
or Avro
format to serialize/deserialize data to/from the binary
format used by Kafka. The default format is JSON
. Throughout this document, I will point out what to do if you want to use Avro
.
Start Environment
-
Open a terminal and inside
springboot-kafka-connect-jdbc-streams
root folder rundocker-compose up -d
Note: During the first run, an image for
kafka-connect
will be built, whose name isspringboot-kafka-connect-jdbc-streams_kafka-connect
. Run the command below to rebuild it.docker-compose build
-
Wait for all Docker containers to be up and running. To check it, run
docker-compose ps
Create Kafka Topics
In order to have topics in Kafka
with more than 1
partition, we have to create them manually and not let the connectors to create them for us. So, for it:
-
Open a new terminal and make sure you are in
springboot-kafka-connect-jdbc-streams
root folder -
Run the script below
./create-kafka-topics.sh
It will create the topics
mysql.storedb.customers
,mysql.storedb.products
,mysql.storedb.orders
,mysql.storedb.orders_products
with5
partitions.
Create connectors
Connectors use Converters
for data serialization and deserialization. If you are configuring For JSON (de)serialization
, the converter used is JsonConverter
. On the other hand, the converter used is AvroConverter
.
Important: if the Source Connector Converter
serializes data, for instance, from JSON
to bytes
(using JsonConverter
), then the Sink Connector Converter
must also use JsonConverter
to deserialize the bytes
, otherwise an error will be thrown. The document Kafka Connect Deep Dive – Converters and Serialization Explained explains it very well.
Steps to create the connectors:
-
In a terminal, navigate to
springboot-kafka-connect-jdbc-streams
root folder -
Run the following script to create the connectors on
kafka-connect
-
For JSON (de)serialization
./create-connectors-jsonconverter.sh
-
For Avro (de)serialization
./create-connectors-avroconverter.sh
-
-
You can check the state of the connectors and their tasks on
Kafka Connect UI
or running the following script./check-connectors-state.sh
-
Once the connectors and their tasks are ready (
RUNNING
state), you should see something like{"name":"mysql-source-customers","connector":{"state":"RUNNING","worker_id":"kafka-connect:8083"},"tasks":[],"type":"source"} {"name":"mysql-source-products","connector":{"state":"RUNNING","worker_id":"kafka-connect:8083"},"tasks":[],"type":"source"} {"name":"mysql-source-orders","connector":{"state":"RUNNING","worker_id":"kafka-connect:8083"},"tasks":[],"type":"source"} {"name":"mysql-source-orders_products","connector":{"state":"RUNNING","worker_id":"kafka-connect:8083"},"tasks":[],"type":"source"} {"name":"elasticsearch-sink-customers","connector":{"state":"RUNNING","worker_id":"kafka-connect:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"kafka-connect:8083"}],"type":"sink"} {"name":"elasticsearch-sink-products","connector":{"state":"RUNNING","worker_id":"kafka-connect:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"kafka-connect:8083"}],"type":"sink"} {"name":"elasticsearch-sink-orders","connector":{"state":"RUNNING","worker_id":"kafka-connect:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"kafka-connect:8083"}],"type":"sink"}
-
On
Kafka Connect UI
(http://localhost:8086), you should see -
If there is any problem, you can check
kafka-connect
container logsdocker logs kafka-connect
Running Applications with Maven
-
store-api
-
Open a new terminal and make sure you are in
springboot-kafka-connect-jdbc-streams
root folder. -
Run the command below to start the application
./mvnw clean spring-boot:run --projects store-api -Dspring-boot.run.jvmArguments="-Dserver.port=9080"
Note
It will create all tables, such as:
customers
,products
,orders
andorders_products
. We are usingspring.jpa.hibernate.ddl-auto=update
configuration.It will also insert some customers and products. If you don't want it, just set to
false
the propertiesload-samples.customers.enabled
andload-samples.products.enabled
inapplication.yml
.
-
-
store-streams
-
Open a new terminal and inside
springboot-kafka-connect-jdbc-streams
root folder. -
To start application, run
-
For JSON (de)serialization
./mvnw clean spring-boot:run --projects store-streams -Dspring-boot.run.jvmArguments="-Dserver.port=9081"
-
For Avro (de)serialization
Warning: Unable to run in this mode in my machine! The application starts fine when using
avro
profile but, when the 1st event arrives, theorg.apache.kafka.common.errors.SerializationException: Unknown magic byte!
is thrown. The problem doesn't happen while Running Applications as Docker containers../mvnw clean spring-boot:run --projects store-streams -Dspring-boot.run.jvmArguments="-Dserver.port=9081" -Dspring-boot.run.profiles=avro
The command below generates Java classes from Avro files present in
src/main/resources/avro
./mvnw generate-sources --projects store-streams
-
-
Running Applications as Docker containers
Build Application’s Docker Image
-
In a terminal, make sure you are inside
springboot-kafka-connect-jdbc-streams
root folder -
Run the following script to build the application's docker image
./docker-build.sh
Application’s Environment Variables
-
store-api
Environment Variable Description MYSQL_HOST
Specify host of the MySQL
database to use (defaultlocalhost
)MYSQL_PORT
Specify port of the MySQL
database to use (default3306
) -
store-streams
Environment Variable Description KAFKA_HOST
Specify host of the Kafka
message broker to use (defaultlocalhost
)KAFKA_PORT
Specify port of the Kafka
message broker to use (default29092
)SCHEMA_REGISTRY_HOST
Specify host of the Schema Registry
to use (defaultlocalhost
)SCHEMA_REGISTRY_PORT
Specify port of the Schema Registry
to use (default8081
)
Run Application’s Docker Container
-
In a terminal, make sure you are inside
springboot-kafka-connect-jdbc-streams
root folder -
In order to run the application's docker containers, you can pick between
JSON
orAvro
-
For JSON (de)serialization
./start-apps.sh
-
For Avro (de)serialization
./start-apps.sh avro
-
For JSON (de)serialization
Application's URL
Application | URL |
---|---|
store-api | http://localhost:9080/swagger-ui/index.html |
store-streams | http://localhost:9081/actuator/health |
Testing
-
Let's simulate an order creation. In this example, customer with id
1
{"id":1, "name":"John Gates", "email":"[email protected]", "address":"street 1", "phone":"112233"}
will order one unit of the product with id
15
{"id":15, "name":"iPhone Xr", "price":900.00}
In a terminal, run the following
curl
commandcurl -i -X POST localhost:9080/api/orders \ -H 'Content-Type: application/json' \ -d '{"customerId": 1, "paymentType": "BITCOIN", "status": "OPEN", "products": [{"id": 15, "unit": 1}]}'
The response should be
HTTP/1.1 201 { "id": "47675629-4f0d-440d-b6df-c829874ee2a6", "customerId": 1, "paymentType": "BITCOIN", "status": "OPEN", "products": [{"id": 15, "unit": 1}] }
-
Checking
Elasticsearch
curl "localhost:9200/store.streams.orders/_search?pretty"
We should have one order with a customer and products names.
{ "took" : 844, "timed_out" : false, "_shards" : { "total" : 1, "successful" : 1, "skipped" : 0, "failed" : 0 }, "hits" : { "total" : { "value" : 1, "relation" : "eq" }, "max_score" : 1.0, "hits" : [ { "_index" : "store.streams.orders", "_type" : "order", "_id" : "47675629-4f0d-440d-b6df-c829874ee2a6", "_score" : 1.0, "_source" : { "payment_type" : "BITCOIN", "created_at" : 1606821792360, "id" : "47675629-4f0d-440d-b6df-c829874ee2a6", "customer_name" : "John Gates", "customer_id" : 1, "status" : "OPEN", "products" : [ { "unit" : 1, "price" : 900, "name" : "iPhone Xr", "id" : 15 } ] } } ] } }
-
In order to create random orders, we can use also the
simulation
curl -i -X POST localhost:9080/api/simulation/orders \ -H 'Content-Type: application/json' \ -d '{"total": 10, "sleep": 100}'
Useful Links/Commands
-
Kafka Topics UI
Kafka Topics UI
can be accessed at http://localhost:8085 -
Kafka Connect UI
Kafka Connect UI
can be accessed at http://localhost:8086 -
Schema Registry UI
Schema Registry UI
can be accessed at http://localhost:8001 -
Schema Registry
You can use
curl
to check the subjects inSchema Registry
- Get the list of subjects
curl localhost:8081/subjects
- Get the latest version of the subject
mysql.storedb.customers-value
curl localhost:8081/subjects/mysql.storedb.customers-value/versions/latest
- Get the list of subjects
-
Kafka Manager
Kafka Manager
can be accessed at http://localhost:9000Configuration
- First, you must create a new cluster. Click on
Cluster
(dropdown on the header) and then onAdd Cluster
- Type the name of your cluster in
Cluster Name
field, for example:MyCluster
- Type
zookeeper:2181
inCluster Zookeeper Hosts
field - Enable checkbox
Poll consumer information (Not recommended for large # of consumers if ZK is used for offsets tracking on older Kafka versions)
- Click on
Save
button at the bottom of the page.
- First, you must create a new cluster. Click on
-
Elasticsearch
Elasticsearch
can be accessed at http://localhost:9200- Get all indices
curl "localhost:9200/_cat/indices?v"
- Search for documents
curl "localhost:9200/mysql.storedb.customers/_search?pretty" curl "localhost:9200/mysql.storedb.products/_search?pretty" curl "localhost:9200/store.streams.orders/_search?pretty"
- Get all indices
-
MySQL
docker exec -it -e MYSQL_PWD=secret mysql mysql -uroot --database storedb select * from orders;
-
Kafkacat
docker run --tty --interactive --rm --network=springboot-kafka-connect-jdbc-streams_default \ confluentinc/cp-kafkacat:7.0.1 kafkacat -b kafka:9092 \ -f '\nKey (%K bytes): %k\t\nValue (%S bytes): %s\n\Partition: %p\tOffset: %o\n--\n' \ -t store.streams.orders -C -c1
Shutdown
- To stop applications
- If they were started with
Maven
, go to the terminals where they are running and pressCtrl+C
- If they were started as Docker containers, go to a terminal and, inside
springboot-kafka-connect-jdbc-streams
root folder, run the script below./stop-apps.sh
- If they were started with
- To stop and remove docker-compose containers, network and volumes, go to a terminal and, inside
springboot-kafka-connect-jdbc-streams
root folder, run the following commanddocker-compose down -v
Cleanup
To remove the Docker images created by this project, go to a terminal and, inside springboot-kafka-connect-jdbc-streams
root folder, run the script below
./remove-docker-images.sh
Issues
- Product
price
field, numeric.mapping doesn't work for DECIMAL fields #563. For now, the workaround is usingString
instead ofBigDecimal
as type for this field.
References
- https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-kafka/current/reference/html/spring-cloud-stream-binder-kafka.html#_kafka_streams_binder
- https://github.com/spring-cloud/spring-cloud-stream-samples/tree/master/schema-registry-samples
- https://www.confluent.io/blog/simplest-useful-kafka-connect-data-pipeline-world-thereabouts-part-1 (2 and 3)
- https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained