Spring Boot + Kafka - Producer and Consumer Example
In this section, we will learn how to do asynchronous communication using Kafka in Spring Boot application.
We will develop two Spring Boot applications, one producer and one consumer application, then we will learn how to use Kafka in the Spring boot application to send and receive messages between the Producer and the Consumer.
1. What we will build?
We will develop two Spring Boot applications, one producer and one consumer application, then we will learn how to use Kafka in the Spring boot application to send and receive messages between the Producer and the Consumer.
The following diagram is an overview of the components in our messaging system.
2.Key terminologies of Kafka:
- Producer: A producer is a client that sends messages to the Kafka server to the specified topic.
- Consumer: Consumers are the recipients who receive messages from the Kafka server.
- Broker: Brokers can create a Kafka cluster by sharing information using Zookeeper. A broker receives messages from producers and consumers fetch messages from the broker by topic, partition, and offset.
- Cluster: A Kafka cluster is a system that consists of several Brokers, Topics, and Partitions for both.
- Topic: Kafka topics are the categories used to organize messages. Each topic has a name that is unique across the entire Kafka cluster. Messages are sent to and read from specific topics. In other words, producers write data to topics, and consumers read data from topics.
When working with Apache Kafka, ZooKeeper is primarily used to track the status of nodes in the Kafka cluster and maintain a list of Kafka topics and messages.
3. Install and Setup Apache Kafka
- Download Kafka from the official website at https://kafka.apache.org/downloads
- Extract Kafka zip in the local file system
- Start Zookeeper service. Use the below command to start the Zookeeper service:
.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
- Start Kafka Broker. Open another terminal session and run the below command to start the Kafka broker:
.\bin\windows\kafka-server-start.bat .\config\server.properties
4. Creating spring boot consumer application
Then, click on the Generate button. When we click on the Generate button, it starts packing the project in a .zip(consumer) file and downloads the project. Then, Extract the Zip file.
Then, import the project on your favourite IDE.
Final Project directory:
Complete pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.1.5</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.knf.dev.demo</groupId>
<artifactId>consumer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>consumer</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>17</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
application.yaml
You can then configure kafka properties according to the Documentation - scroll down to the kafka properties...
Then, specify bootstrap-servers, key-serializer, value-serializer, topic name, and group.
spring:
kafka:
producer:
bootstrap-servers: localhost:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
kafka:
topic:
name: knowledgeFactory_Topic
group: knowledgeFactory_group
Create Consumer
Consumer is the service that will be responsible for reading messages and processing them according to the needs of your own business logic. @KafkaListener allows a method to consume messages from Kafka topic(s).
package com.knf.dev.demo.consumer.service;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class Consumer {
@KafkaListener(topics = {"${kafka.topic.name}"}, groupId = "${kafka.topic.name}")
public void listenToTopic(String message)
{
System.out.println("Message arrived! Message: " + message);
}
}
ConsumerApplication.java
package com.knf.dev.demo.consumer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}
}
5. Creating spring boot producer application
First, open the Spring initializr https://start.spring.io/
Then, Provide the Group and Artifact name. We have provided Group name com.knf.dev.demo and Artifact producer. Here I selected the Maven project - language Java 17 - Spring Boot 3.1.5 and add Spring Web, and Spring for Apache Kafka.
Final Project directory:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.1.5</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.knf.dev.demo</groupId>
<artifactId>producer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>producer</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>17</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
application.yaml
Add the following configuration to application.yaml file:
spring:
kafka:
producer:
bootstrap-servers: localhost:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
kafka:
topic:
name: knowledgeFactory_Topic
Create Producer
Creating a producer will write our messages on the topic. KafkaTemplate is a class that is part of the Spring to produce messages into the Kafka Topics.
package com.knf.dev.demo.producer.service;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class Producer {
KafkaTemplate<String,String> kafkaTemplate;
@Value("${kafka.topic.name}")
private String topic;
public Producer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessageToTopic(String message)
{
kafkaTemplate.send(topic,message);
}
}
Create MessageController
package com.knf.dev.demo.producer.controller;
import com.knf.dev.demo.producer.service.Producer;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/api/v1")
public class MessageController {
Producer producer;
public MessageController(Producer producer) {
this.producer = producer;
}
@GetMapping("/sendMessage")
public String publishMessage(@RequestParam("message") String message) {
producer.sendMessageToTopic(message);
return "Message was sent successfully";
}
}
ProducerApplication.java
package com.knf.dev.demo.producer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class ProducerApplication {
public static void main(String[] args) {
SpringApplication.run(ProducerApplication.class, args);
}
}
6. Verify our system is working as expected
First start the producer application, then start the consumer application:
Run the Spring Boot application - mvn spring-boot:run
OR
Run this Spring boot application from
- IntelliJ IDEA IDE by right click - Run 'Application.main()'
- Eclipse/STS - You can right click the project or the Application.java file and run as java application or Spring boot application.
Run the Spring Boot application - mvn spring-boot:run
OR
Run this Spring boot application from
- IntelliJ IDEA IDE by right click - Run 'Application.main()'
- Eclipse/STS - You can right click the project or the Application.java file and run as java application or Spring boot application.