spring-cloud-stream-kafka-elasticsearch
spring-cloud-stream-kafka-elasticsearch copied to clipboard
The goal of this project is to implement a "News" processing pipeline composed of five Spring Boot applications: producer-api, categorizer-service, collector-service, publisher-api and news-client.
= spring-cloud-stream-kafka-elasticsearch
The goal of this project is to implement a "News" processing pipeline composed of five https://docs.spring.io/spring-boot/docs/current/reference/htmlsingle/[Spring Boot
] applications: producer-api
, categorizer-service
, collector-service
, publisher-api
and news-client
.
== Proof-of-Concepts & Articles
On https://ivangfr.github.io[ivangfr.github.io], I have compiled my Proof-of-Concepts (PoCs) and articles. You can easily search for the technology you are interested in by using the filter. Who knows, perhaps I have already implemented a PoC or written an article about what you are looking for.
== Additional Readings
- [Medium]: https://medium.com/javarevisited/implementing-a-kafka-producer-and-consumer-using-spring-cloud-stream-d4b9a6a9eab1[Implementing a Kafka Producer and Consumer using Spring Cloud Stream]
- [Medium]: https://medium.com/javarevisited/implementing-unit-tests-for-a-kafka-producer-and-consumer-that-uses-spring-cloud-stream-f7a98a89fcf2[Implementing Unit Tests for a Kafka Producer and Consumer that uses Spring Cloud Stream]
- [Medium]: https://medium.com/javarevisited/implementing-end-to-end-testing-for-a-kafka-producer-and-consumer-that-uses-spring-cloud-stream-fbf5e666899e[Implementing End-to-End testing for a Kafka Producer and Consumer that uses Spring Cloud Stream]
- [Medium]: https://medium.com/javarevisited/configuring-distributed-tracing-with-zipkin-in-a-kafka-producer-and-consumer-that-uses-spring-cloud-9f1e55468b9e[Configuring Distributed Tracing with Zipkin in a Kafka Producer and Consumer that uses Spring Cloud Stream]
- [Medium]: https://medium.com/@ivangfr/using-cloudevents-in-a-kafka-producer-and-consumer-that-uses-spring-cloud-stream-9c51670b5566[Using Cloudevents in a Kafka Producer and Consumer that uses Spring Cloud Stream]
== Technologies used
- https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/[
Spring Cloud Stream
] to build highly scalable event-driven applications connected with shared messaging systems; - https://cloud.spring.io/spring-cloud-static/spring-cloud-schema-registry/current/reference/html/spring-cloud-schema-registry.html[
Spring Cloud Schema Registry
] that supports schema evolution so that the data can be evolved over time; besides, it lets you store schema information in a textual format (typically JSON) and makes that information accessible to various applications that need it to receive and send data in binary format; - https://docs.spring.io/spring-data/elasticsearch/docs/current/reference/html/[
Spring Data Elasticsearch
] to persist data in https://www.elastic.co/products/elasticsearch[Elasticsearch
]; - https://docs.spring.io/spring-cloud-openfeign/docs/current/reference/html/[
Spring Cloud OpenFeign
] to write web service clients easily; - https://www.thymeleaf.org/[
Thymeleaf
] as HTML template; - https://zipkin.io[
Zipkin
] to visualize traces between and within applications; - https://github.com/Netflix/eureka[
Eureka
] as service registration and discovery.
NOTE: In https://github.com/ivangfr/docker-swarm-environment[docker-swarm-environment
] repository, it is shown how to deploy this project into a cluster of Docker Engines in swarm mode.
== Project Architecture
image::documentation/project-diagram.jpeg[]
== Applications
- producer-api
Spring Boot
Web Java application that creates news and pushes news events to producer.news
topic in Kafka
.
- categorizer-service
Spring Boot
Web Java application that listens to news events in producer.news
topic in Kafka
, categorizes and pushes them to categorizer.news
topic.
- collector-service
Spring Boot
Web Java application that listens for news events in categorizer.news
topic in Kafka
, saves them in Elasticsearch
and pushes the news events to collector.news
topic.
- publisher-api
Spring Boot
Web Java application that reads directly from Elasticsearch
and exposes a REST API. It doesn't listen from Kafka
.
- news-client
Spring Boot
Web java application that provides a User Interface to see the news. It implements a Websocket
that consumes news events from the topic collector.news
. So, news are updated on the fly on the main page. Besides, news-client
communicates directly with publisher-api
whenever search for a specific news or news update are needed.
+
The Websocket
operation is shown in the short gif below. News is created in producer-api
and, immediately, it is shown in news-client
.
+
image::documentation/websocket-operation.gif[]
== Prerequisites
- https://www.oracle.com/java/technologies/downloads/#java17[
Java 17+
] - https://www.docker.com/[
Docker
]
== Generate NewsEvent
-
In a terminal, make sure you are in
spring-cloud-stream-kafka-elasticsearch
root folder -
Run the following command to generate
NewsEvent
[source]
./mvnw clean install --projects commons-news
It will install commons-news-1.0.0.jar
in you local Maven
repository, so that it can be visible by all services.
== Start Environment
- Open a terminal and inside
spring-cloud-stream-kafka-elasticsearch
root folder run
[source]
docker compose up -d
- Wait for Docker containers to be up and running. To check it, run
[source]
docker compose ps
== Running Applications with Maven
Inside spring-cloud-stream-kafka-elasticsearch
root folder, run the following Maven
commands in different terminals
- eureka-server
[source]
./mvnw clean spring-boot:run --projects eureka-server
- producer-api
[source]
./mvnw clean spring-boot:run --projects producer-api -Dspring-boot.run.jvmArguments="-Dserver.port=9080"
- categorizer-service
[source]
./mvnw clean spring-boot:run --projects categorizer-service -Dspring-boot.run.jvmArguments="-Dserver.port=9081"
- collector-service
[source]
./mvnw clean spring-boot:run --projects collector-service -Dspring-boot.run.jvmArguments="-Dserver.port=9082"
- publisher-api
[source]
./mvnw clean spring-boot:run --projects publisher-api -Dspring-boot.run.jvmArguments="-Dserver.port=9083"
- news-client
[source]
./mvnw clean spring-boot:run --projects news-client
== Running Applications as Docker containers
=== Build Application's Docker Image
-
In a terminal, make sure you are in
spring-cloud-stream-kafka-elasticsearch
root folder -
In order to build the application's docker images, run the following script
[source]
./docker-build.sh
=== Application's Environment Variables
- producer-api
|=== |Environment Variable | Description
|KAFKA_HOST
|Specify host of the Kafka
message broker to use (default localhost
)
|KAFKA_PORT
|Specify port of the Kafka
message broker to use (default 29092
)
|SCHEMA_REGISTRY_HOST
|Specify host of the Schema Registry
to use (default localhost
)
|SCHEMA_REGISTRY_PORT
|Specify port of the Schema Registry
to use (default 8081
)
|EUREKA_HOST
|Specify host of the Eureka
service discovery to use (default localhost
)
|EUREKA_PORT
|Specify port of the Eureka
service discovery to use (default 8761
)
|ZIPKIN_HOST
|Specify host of the Zipkin
distributed tracing system to use (default localhost
)
|ZIPKIN_PORT
|Specify port of the Zipkin
distributed tracing system to use (default 9411
)
|===
- categorizer-service
|=== |Environment Variable | Description
|KAFKA_HOST
|Specify host of the Kafka
message broker to use (default localhost
)
|KAFKA_PORT
|Specify port of the Kafka
message broker to use (default 29092
)
|SCHEMA_REGISTRY_HOST
|Specify host of the Schema Registry
to use (default localhost
)
|SCHEMA_REGISTRY_PORT
|Specify port of the Schema Registry
to use (default 8081
)
|EUREKA_HOST
|Specify host of the Eureka
service discovery to use (default localhost
)
|EUREKA_PORT
|Specify port of the Eureka
service discovery to use (default 8761
)
|ZIPKIN_HOST
|Specify host of the Zipkin
distributed tracing system to use (default localhost
)
|ZIPKIN_PORT
|Specify port of the Zipkin
distributed tracing system to use (default 9411
)
|===
- collector-service
|=== |Environment Variable | Description
|ELASTICSEARCH_HOST
|Specify host of the Elasticsearch
search engine to use (default localhost
)
|ELASTICSEARCH_NODES_PORT
|Specify nodes port of the Elasticsearch
search engine to use (default 9300
)
|ELASTICSEARCH_REST_PORT
|Specify rest port of the Elasticsearch
search engine to use (default 9200
)
|KAFKA_HOST
|Specify host of the Kafka
message broker to use (default localhost
)
|KAFKA_PORT
|Specify port of the Kafka
message broker to use (default 29092
)
|SCHEMA_REGISTRY_HOST
|Specify host of the Schema Registry
to use (default localhost
)
|SCHEMA_REGISTRY_PORT
|Specify port of the Schema Registry
to use (default 8081
)
|EUREKA_HOST
|Specify host of the Eureka
service discovery to use (default localhost
)
|EUREKA_PORT
|Specify port of the Eureka
service discovery to use (default 8761
)
|ZIPKIN_HOST
|Specify host of the Zipkin
distributed tracing system to use (default localhost
)
|ZIPKIN_PORT
|Specify port of the Zipkin
distributed tracing system to use (default 9411
)
|===
- publisher-api
|=== |Environment Variable | Description
|ELASTICSEARCH_HOST
|Specify host of the Elasticsearch
search engine to use (default localhost
)
|ELASTICSEARCH_NODES_PORT
|Specify nodes port of the Elasticsearch
search engine to use (default 9300
)
|ELASTICSEARCH_REST_PORT
|Specify rest port of the Elasticsearch
search engine to use (default 9200
)
|EUREKA_HOST
|Specify host of the Eureka
service discovery to use (default localhost
)
|EUREKA_PORT
|Specify port of the Eureka
service discovery to use (default 8761
)
|ZIPKIN_HOST
|Specify host of the Zipkin
distributed tracing system to use (default localhost
)
|ZIPKIN_PORT
|Specify port of the Zipkin
distributed tracing system to use (default 9411
)
|===
- news-client
|=== |Environment Variable | Description
|KAFKA_HOST
|Specify host of the Kafka
message broker to use (default localhost
)
|KAFKA_PORT
|Specify port of the Kafka
message broker to use (default 29092
)
|SCHEMA_REGISTRY_HOST
|Specify host of the Schema Registry
to use (default localhost
)
|SCHEMA_REGISTRY_PORT
|Specify port of the Schema Registry
to use (default 8081
)
|EUREKA_HOST
|Specify host of the Eureka
service discovery to use (default localhost
)
|EUREKA_PORT
|Specify port of the Eureka
service discovery to use (default 8761
)
|ZIPKIN_HOST
|Specify host of the Zipkin
distributed tracing system to use (default localhost
)
|ZIPKIN_PORT
|Specify port of the Zipkin
distributed tracing system to use (default 9411
)
|===
=== Run Application's Docker Container
-
In a terminal, make sure you are inside
spring-cloud-stream-kafka-elasticsearch
root folder -
Run following script
[source]
./start-apps.sh
== Applications URLs
|=== |Application |URL
|producer-api |http://localhost:9080/swagger-ui.html
|publisher-api |http://localhost:9083/swagger-ui.html
|news-client |http://localhost:8080
|===
== Useful links
- Eureka
Eureka
can be accessed at http://localhost:8761
- Zipkin
Zipkin
can be accessed at http://localhost:9411
- Kafka Topics UI
Kafka Topics UI
can be accessed at http://localhost:8085
- Kafka Manager
Kafka Manager
can be accessed at http://localhost:9001
+
Configuration
+
- First, you must create a new cluster. Click on
Cluster
(dropdown button on the header) and then onAdd Cluster
- Type the name of your cluster in
Cluster Name
field, for example:MyCluster
- Type
zookeeper:2181
inCluster Zookeeper Hosts
field - Enable checkbox
Poll consumer information (Not recommended for large # of consumers if ZK is used for offsets tracking on older Kafka versions)
- Click on
Save
button at the bottom of the page.
- Schema Registry UI
Schema Registry UI
can be accessed at http://localhost:8001
- Elasticsearch REST API
Check ES is up and running + [source]
curl localhost:9200
Check indexes + [source]
curl "localhost:9200/_cat/indices?v"
Check news index mapping + [source]
curl "localhost:9200/news/_mapping?pretty"
Simple search + [source]
curl "localhost:9200/news/_search?pretty"
Delete news index + [source]
curl -X DELETE localhost:9200/news
== Shutdown
- To stop applications
** If they were started with
Maven
, go to the terminals where they are running and pressCtrl+C
** If they were started as Docker containers, go to a terminal and, insidespring-cloud-stream-kafka-elasticsearch
root folder, run the script below
[source]
./stop-apps.sh
- To stop and remove docker compose containers, network and volumes, go to a terminal and, inside
spring-cloud-stream-kafka-elasticsearch
root folder, run the following command
[source]
docker compose down -v
== Cleanup