spring-cloud-stream-kafka-elasticsearch
spring-cloud-stream-kafka-elasticsearch copied to clipboard
The goal of this project is to implement a "News" processing pipeline composed of five Spring Boot applications: producer-api, categorizer-service, collector-service, publisher-api and news-client.
= spring-cloud-stream-kafka-elasticsearch
The goal of this project is to implement a "News" processing pipeline composed of five https://docs.spring.io/spring-boot/docs/current/reference/htmlsingle/[Spring Boot] applications: producer-api, categorizer-service, collector-service, publisher-api and news-client.
== Proof-of-Concepts & Articles
On https://ivangfr.github.io[ivangfr.github.io], I have compiled my Proof-of-Concepts (PoCs) and articles. You can easily search for the technology you are interested in by using the filter. Who knows, perhaps I have already implemented a PoC or written an article about what you are looking for.
== Additional Readings
- [Medium]: https://medium.com/javarevisited/implementing-a-kafka-producer-and-consumer-using-spring-cloud-stream-d4b9a6a9eab1[Implementing a Kafka Producer and Consumer using Spring Cloud Stream]
- [Medium]: https://medium.com/javarevisited/implementing-unit-tests-for-a-kafka-producer-and-consumer-that-uses-spring-cloud-stream-f7a98a89fcf2[Implementing Unit Tests for a Kafka Producer and Consumer that uses Spring Cloud Stream]
- [Medium]: https://medium.com/javarevisited/implementing-end-to-end-testing-for-a-kafka-producer-and-consumer-that-uses-spring-cloud-stream-fbf5e666899e[Implementing End-to-End testing for a Kafka Producer and Consumer that uses Spring Cloud Stream]
- [Medium]: https://medium.com/javarevisited/configuring-distributed-tracing-with-zipkin-in-a-kafka-producer-and-consumer-that-uses-spring-cloud-9f1e55468b9e[Configuring Distributed Tracing with Zipkin in a Kafka Producer and Consumer that uses Spring Cloud Stream]
- [Medium]: https://medium.com/@ivangfr/using-cloudevents-in-a-kafka-producer-and-consumer-that-uses-spring-cloud-stream-9c51670b5566[Using Cloudevents in a Kafka Producer and Consumer that uses Spring Cloud Stream]
== Technologies used
- https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/[
Spring Cloud Stream] to build highly scalable event-driven applications connected with shared messaging systems; - https://cloud.spring.io/spring-cloud-static/spring-cloud-schema-registry/current/reference/html/spring-cloud-schema-registry.html[
Spring Cloud Schema Registry] that supports schema evolution so that the data can be evolved over time; besides, it lets you store schema information in a textual format (typically JSON) and makes that information accessible to various applications that need it to receive and send data in binary format; - https://docs.spring.io/spring-data/elasticsearch/docs/current/reference/html/[
Spring Data Elasticsearch] to persist data in https://www.elastic.co/products/elasticsearch[Elasticsearch]; - https://docs.spring.io/spring-cloud-openfeign/docs/current/reference/html/[
Spring Cloud OpenFeign] to write web service clients easily; - https://www.thymeleaf.org/[
Thymeleaf] as HTML template; - https://zipkin.io[
Zipkin] to visualize traces between and within applications; - https://github.com/Netflix/eureka[
Eureka] as service registration and discovery.
NOTE: In https://github.com/ivangfr/docker-swarm-environment[docker-swarm-environment] repository, it is shown how to deploy this project into a cluster of Docker Engines in swarm mode.
== Project Architecture
image::documentation/project-diagram.jpeg[]
== Applications
- producer-api
Spring Boot Web Java application that creates news and pushes news events to producer.news topic in Kafka.
- categorizer-service
Spring Boot Web Java application that listens to news events in producer.news topic in Kafka, categorizes and pushes them to categorizer.news topic.
- collector-service
Spring Boot Web Java application that listens for news events in categorizer.news topic in Kafka, saves them in Elasticsearch and pushes the news events to collector.news topic.
- publisher-api
Spring Boot Web Java application that reads directly from Elasticsearch and exposes a REST API. It doesn't listen from Kafka.
- news-client
Spring Boot Web java application that provides a User Interface to see the news. It implements a Websocket that consumes news events from the topic collector.news. So, news are updated on the fly on the main page. Besides, news-client communicates directly with publisher-api whenever search for a specific news or news update are needed.
+
The Websocket operation is shown in the short gif below. News is created in producer-api and, immediately, it is shown in news-client.
+
image::documentation/websocket-operation.gif[]
== Prerequisites
- https://www.oracle.com/java/technologies/downloads/#java17[
Java 17+] - https://www.docker.com/[
Docker]
== Generate NewsEvent
-
In a terminal, make sure you are in
spring-cloud-stream-kafka-elasticsearchroot folder -
Run the following command to generate
NewsEvent
[source]
./mvnw clean install --projects commons-news
It will install commons-news-1.0.0.jar in you local Maven repository, so that it can be visible by all services.
== Start Environment
- Open a terminal and inside
spring-cloud-stream-kafka-elasticsearchroot folder run
[source]
docker compose up -d
- Wait for Docker containers to be up and running. To check it, run
[source]
docker compose ps
== Running Applications with Maven
Inside spring-cloud-stream-kafka-elasticsearch root folder, run the following Maven commands in different terminals
- eureka-server
[source]
./mvnw clean spring-boot:run --projects eureka-server
- producer-api
[source]
./mvnw clean spring-boot:run --projects producer-api -Dspring-boot.run.jvmArguments="-Dserver.port=9080"
- categorizer-service
[source]
./mvnw clean spring-boot:run --projects categorizer-service -Dspring-boot.run.jvmArguments="-Dserver.port=9081"
- collector-service
[source]
./mvnw clean spring-boot:run --projects collector-service -Dspring-boot.run.jvmArguments="-Dserver.port=9082"
- publisher-api
[source]
./mvnw clean spring-boot:run --projects publisher-api -Dspring-boot.run.jvmArguments="-Dserver.port=9083"
- news-client
[source]
./mvnw clean spring-boot:run --projects news-client
== Running Applications as Docker containers
=== Build Application's Docker Image
-
In a terminal, make sure you are in
spring-cloud-stream-kafka-elasticsearchroot folder -
In order to build the application's docker images, run the following script
[source]
./docker-build.sh
=== Application's Environment Variables
- producer-api
|=== |Environment Variable | Description
|KAFKA_HOST
|Specify host of the Kafka message broker to use (default localhost)
|KAFKA_PORT
|Specify port of the Kafka message broker to use (default 29092)
|SCHEMA_REGISTRY_HOST
|Specify host of the Schema Registry to use (default localhost)
|SCHEMA_REGISTRY_PORT
|Specify port of the Schema Registry to use (default 8081)
|EUREKA_HOST
|Specify host of the Eureka service discovery to use (default localhost)
|EUREKA_PORT
|Specify port of the Eureka service discovery to use (default 8761)
|ZIPKIN_HOST
|Specify host of the Zipkin distributed tracing system to use (default localhost)
|ZIPKIN_PORT
|Specify port of the Zipkin distributed tracing system to use (default 9411)
|===
- categorizer-service
|=== |Environment Variable | Description
|KAFKA_HOST
|Specify host of the Kafka message broker to use (default localhost)
|KAFKA_PORT
|Specify port of the Kafka message broker to use (default 29092)
|SCHEMA_REGISTRY_HOST
|Specify host of the Schema Registry to use (default localhost)
|SCHEMA_REGISTRY_PORT
|Specify port of the Schema Registry to use (default 8081)
|EUREKA_HOST
|Specify host of the Eureka service discovery to use (default localhost)
|EUREKA_PORT
|Specify port of the Eureka service discovery to use (default 8761)
|ZIPKIN_HOST
|Specify host of the Zipkin distributed tracing system to use (default localhost)
|ZIPKIN_PORT
|Specify port of the Zipkin distributed tracing system to use (default 9411)
|===
- collector-service
|=== |Environment Variable | Description
|ELASTICSEARCH_HOST
|Specify host of the Elasticsearch search engine to use (default localhost)
|ELASTICSEARCH_NODES_PORT
|Specify nodes port of the Elasticsearch search engine to use (default 9300)
|ELASTICSEARCH_REST_PORT
|Specify rest port of the Elasticsearch search engine to use (default 9200)
|KAFKA_HOST
|Specify host of the Kafka message broker to use (default localhost)
|KAFKA_PORT
|Specify port of the Kafka message broker to use (default 29092)
|SCHEMA_REGISTRY_HOST
|Specify host of the Schema Registry to use (default localhost)
|SCHEMA_REGISTRY_PORT
|Specify port of the Schema Registry to use (default 8081)
|EUREKA_HOST
|Specify host of the Eureka service discovery to use (default localhost)
|EUREKA_PORT
|Specify port of the Eureka service discovery to use (default 8761)
|ZIPKIN_HOST
|Specify host of the Zipkin distributed tracing system to use (default localhost)
|ZIPKIN_PORT
|Specify port of the Zipkin distributed tracing system to use (default 9411)
|===
- publisher-api
|=== |Environment Variable | Description
|ELASTICSEARCH_HOST
|Specify host of the Elasticsearch search engine to use (default localhost)
|ELASTICSEARCH_NODES_PORT
|Specify nodes port of the Elasticsearch search engine to use (default 9300)
|ELASTICSEARCH_REST_PORT
|Specify rest port of the Elasticsearch search engine to use (default 9200)
|EUREKA_HOST
|Specify host of the Eureka service discovery to use (default localhost)
|EUREKA_PORT
|Specify port of the Eureka service discovery to use (default 8761)
|ZIPKIN_HOST
|Specify host of the Zipkin distributed tracing system to use (default localhost)
|ZIPKIN_PORT
|Specify port of the Zipkin distributed tracing system to use (default 9411)
|===
- news-client
|=== |Environment Variable | Description
|KAFKA_HOST
|Specify host of the Kafka message broker to use (default localhost)
|KAFKA_PORT
|Specify port of the Kafka message broker to use (default 29092)
|SCHEMA_REGISTRY_HOST
|Specify host of the Schema Registry to use (default localhost)
|SCHEMA_REGISTRY_PORT
|Specify port of the Schema Registry to use (default 8081)
|EUREKA_HOST
|Specify host of the Eureka service discovery to use (default localhost)
|EUREKA_PORT
|Specify port of the Eureka service discovery to use (default 8761)
|ZIPKIN_HOST
|Specify host of the Zipkin distributed tracing system to use (default localhost)
|ZIPKIN_PORT
|Specify port of the Zipkin distributed tracing system to use (default 9411)
|===
=== Run Application's Docker Container
-
In a terminal, make sure you are inside
spring-cloud-stream-kafka-elasticsearchroot folder -
Run following script
[source]
./start-apps.sh
== Applications URLs
|=== |Application |URL
|producer-api |http://localhost:9080/swagger-ui.html
|publisher-api |http://localhost:9083/swagger-ui.html
|news-client |http://localhost:8080
|===
== Useful links
- Eureka
Eureka can be accessed at http://localhost:8761
- Zipkin
Zipkin can be accessed at http://localhost:9411
- Kafka Topics UI
Kafka Topics UI can be accessed at http://localhost:8085
- Kafka Manager
Kafka Manager can be accessed at http://localhost:9001
+
Configuration
+
- First, you must create a new cluster. Click on
Cluster(dropdown button on the header) and then onAdd Cluster - Type the name of your cluster in
Cluster Namefield, for example:MyCluster - Type
zookeeper:2181inCluster Zookeeper Hostsfield - Enable checkbox
Poll consumer information (Not recommended for large # of consumers if ZK is used for offsets tracking on older Kafka versions) - Click on
Savebutton at the bottom of the page.
- Schema Registry UI
Schema Registry UI can be accessed at http://localhost:8001
- Elasticsearch REST API
Check ES is up and running + [source]
curl localhost:9200
Check indexes + [source]
curl "localhost:9200/_cat/indices?v"
Check news index mapping + [source]
curl "localhost:9200/news/_mapping?pretty"
Simple search + [source]
curl "localhost:9200/news/_search?pretty"
Delete news index + [source]
curl -X DELETE localhost:9200/news
== Shutdown
- To stop applications
** If they were started with
Maven, go to the terminals where they are running and pressCtrl+C** If they were started as Docker containers, go to a terminal and, insidespring-cloud-stream-kafka-elasticsearchroot folder, run the script below
[source]
./stop-apps.sh
- To stop and remove docker compose containers, network and volumes, go to a terminal and, inside
spring-cloud-stream-kafka-elasticsearchroot folder, run the following command
[source]
docker compose down -v
== Cleanup