Spring Boot Kafka Integration: Event-Driven Architecture with Spring Data JPA

Explanation:

  • The User sends an HTTP POST request to create an order.
  • OrderController forwards the request to OrderService.
  • OrderService persists the order using OrderRepository (Spring Data JPA).
  • After saving, it converts the order to JSON and sends it to KafkaProducerService.
  • The KafkaProducerService publishes the message to Kafka.
  • The KafkaConsumerService listens to the "order_topic" and processes the order asynchronously.

Here's a complete guide on integrating Spring Data JPA with Apache Kafka for an Event-Driven Architecture from scratch.


1. Overview

We'll build a system where:

  1. A Spring Boot application persists data using Spring Data JPA.
  2. When an entity is saved, an event is published to Kafka.
  3. A Kafka consumer listens for messages and processes them asynchronously.

2. Project Setup

Dependencies (Maven)

Add the following dependencies to your pom.xml:

<dependencies>
    <!-- Spring Boot Starter for JPA -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-jpa</artifactId>
    </dependency>

    <!-- Spring Boot Starter for Kafka -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-kafka</artifactId>
    </dependency>

    <!-- PostgreSQL Driver (or use MySQL if needed) -->
    <dependency>
        <groupId>org.postgresql</groupId>
        <artifactId>postgresql</artifactId>
    </dependency>

    <!-- Lombok (optional, for reducing boilerplate code) -->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <scope>provided</scope>
    </dependency>
</dependencies>

3. Configure Database & Kafka

application.yml

spring:
  datasource:
    url: jdbc:postgresql://localhost:5432/mydb
    username: myuser
    password: mypassword
    driver-class-name: org.postgresql.Driver

  jpa:
    hibernate:
      ddl-auto: update
    show-sql: true

  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: my-group
      auto-offset-reset: earliest

4. Create Entity and Repository

Define an entity class for Order:

import jakarta.persistence.*;
import lombok.*;

@Entity
@Table(name = "orders")
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class Order {
    
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    private String product;
    private int quantity;
}

Repository Interface

import org.springframework.data.jpa.repository.JpaRepository;

public interface OrderRepository extends JpaRepository<Order, Long> {
}

5. Kafka Producer (Publishing Events)

When an order is created, we send an event to Kafka.

Kafka Producer Service

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducerService {

    private final KafkaTemplate<String, String> kafkaTemplate;

    public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendMessage(String topic, String message) {
        kafkaTemplate.send(topic, message);
    }
}

6. Publishing Event After Database Save

Modify the service to publish an event after saving an order.

Order Service

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.stereotype.Service;

@Service
public class OrderService {

    private final OrderRepository orderRepository;
    private final KafkaProducerService kafkaProducerService;
    private final ObjectMapper objectMapper;

    public OrderService(OrderRepository orderRepository, KafkaProducerService kafkaProducerService, ObjectMapper objectMapper) {
        this.orderRepository = orderRepository;
        this.kafkaProducerService = kafkaProducerService;
        this.objectMapper = objectMapper;
    }

    public Order createOrder(Order order) throws JsonProcessingException {
        Order savedOrder = orderRepository.save(order);
        
        // Convert order to JSON and send it to Kafka
        String orderJson = objectMapper.writeValueAsString(savedOrder);
        kafkaProducerService.sendMessage("order_topic", orderJson);
        
        return savedOrder;
    }
}

7. Kafka Consumer (Processing Events)

Now, let's consume messages from Kafka.

Kafka Consumer Service

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumerService {

    @KafkaListener(topics = "order_topic", groupId = "my-group")
    public void consume(String message) {
        System.out.println("Received message: " + message);
    }
}

8. Exposing REST API

Order Controller

import org.springframework.web.bind.annotation.*;
import com.fasterxml.jackson.core.JsonProcessingException;

@RestController
@RequestMapping("/orders")
public class OrderController {

    private final OrderService orderService;

    public OrderController(OrderService orderService) {
        this.orderService = orderService;
    }

    @PostMapping
    public Order createOrder(@RequestBody Order order) throws JsonProcessingException {
        return orderService.createOrder(order);
    }
}

9. Running the Application

Step 1: Start Kafka & Zookeeper

If you haven’t installed Kafka yet, you can start it using Docker:

docker-compose up -d

Or start Kafka manually:

bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties

Step 2: Create Kafka Topic

bin/kafka-topics.sh --create --topic order_topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

Step 3: Run Spring Boot Application

Start your Spring Boot application:

mvn spring-boot:run

Step 4: Test API

Send a POST request to create an order:

curl -X POST http://localhost:8080/orders \
     -H "Content-Type: application/json" \
     -d '{"product":"Laptop","quantity":2}'

Step 5: Check Kafka Logs

The consumer should receive the message:

Received message: {"id":1,"product":"Laptop","quantity":2}

10. Summary

  • Spring Data JPA persists orders in the database.
  • Kafka Producer sends an event when an order is created.
  • Kafka Consumer listens and processes order events asynchronously.
  • Spring Boot REST API allows external applications to create orders.

This architecture is useful for microservices, event sourcing, and real-time data streaming.

Popular posts from this blog

Learn Java 8 streams with an example - print odd/even numbers from Array and List

Java Stream API - How to convert List of objects to another List of objects using Java streams?

Registration and Login with Spring Boot + Spring Security + Thymeleaf

Java, Spring Boot Mini Project - Library Management System - Download

ReactJS, Spring Boot JWT Authentication Example

Top 5 Java ORM tools - 2024

Java - Blowfish Encryption and decryption Example

Spring boot video streaming example-HTML5

Google Cloud Storage + Spring Boot - File Upload, Download, and Delete