Kotlin Ktor Kafka: Producer and Consumer Example

Here’s a step-by-step explanation of the sequence diagram for message flow:

1. User Sends a Request

  • The user initiates the process by sending a POST request to the Ktor server at the endpoint /kafka/send.
  • This request contains the message payload that the user wants to send.

2. Ktor Server Processes the Request

  • Upon receiving the request, the Ktor server calls the method sendMessage("my-topic", message).
  • The method is responsible for sending the user's message to the specified Kafka topic (my-topic).

3. Kafka Producer Sends the Message

  • The Kafka Producer, configured within the Ktor server, takes the message and sends it to the Kafka Broker.
  • The Kafka Producer ensures that the message is sent to the topic "my-topic" on the Kafka cluster.

4. Kafka Broker Receives the Message

  • The Kafka Broker receives the message from the Kafka Producer.
  • The Kafka Broker stores the message in the appropriate partition of the topic "my-topic", making it available for consumers.

5. Kafka Consumer Retrieves the Message

  • The Kafka Consumer, which is subscribed to the topic "my-topic", retrieves the message from the Kafka Broker.
  • The message delivery to the consumer happens asynchronously.

6. Ktor Server Handles the Consumed Message

  • Once the Kafka Consumer processes the message, it sends the consumed message back to the Ktor server (if required for further processing).
  • The Ktor server then responds to the user or processes the message further based on the application logic.


Here's a simple example of setting up Kafka producer and consumer with Ktor:

1. Setup Kafka

To run Kafka locally using Docker:

docker run -d --name kafka -p 9093:9093 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9093 -e KAFKA_LISTENER_SECURITY_PROTOCOL=PLAINTEXT -e KAFKA_LISTENER_PORT=9093 -e KAFKA_LISTENER_NAME=PLAINTEXT -e KAFKA_LISTENER_MODE=host -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 -e KAFKA_LISTENER_NAMES=PLAINTEXT -e KAFKA_LISTENER_INTERNAL=PLAINTEXT://localhost:9093 wurstmeister/kafka

Ensure Kafka and Zookeeper are running, or use cloud services like Confluent Cloud.

2. Dependencies in build.gradle.kts

Add the required dependencies for Ktor and Kafka:

plugins {
    kotlin("jvm") version "1.8.10"
    application
}

repositories {
    mavenCentral()
}

dependencies {
    implementation("io.ktor:ktor-server-core:2.3.0")
    implementation("io.ktor:ktor-server-netty:2.3.0")
    implementation("org.apache.kafka:kafka-clients:3.1.0")
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.6.4")
}

application {
    mainClass.set("MainKt")
}

3. Kafka Producer Example

The Kafka producer sends messages to a Kafka topic:

import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import java.util.Properties

class KafkaProducerService {
    private val producer: KafkaProducer<String, String>

    init {
        val props = Properties()
        props[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = "localhost:9093"
        props[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = "org.apache.kafka.common.serialization.StringSerializer"
        props[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = "org.apache.kafka.common.serialization.StringSerializer"
        producer = KafkaProducer(props)
    }

    fun sendMessage(topic: String, message: String) {
        val record = ProducerRecord<String, String>(topic, null, message)
        producer.send(record) { metadata, exception ->
            if (exception != null) {
                println("Error sending message: ${exception.message}")
            } else {
                println("Message sent to ${metadata.topic()} with offset ${metadata.offset()}")
            }
        }
    }

    fun close() {
        producer.close()
    }
}

4. Kafka Consumer Example

The Kafka consumer listens for messages on a topic:

import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.serialization.StringDeserializer
import java.util.Properties

class KafkaConsumerService {
    private val consumer: KafkaConsumer<String, String>

    init {
        val props = Properties()
        props["bootstrap.servers"] = "localhost:9093"
        props["group.id"] = "test-group"
        props["key.deserializer"] = StringDeserializer::class.java.name
        props["value.deserializer"] = StringDeserializer::class.java.name
        consumer = KafkaConsumer<String, String>(props)
    }

    fun consumeMessages(topic: String) {
        consumer.subscribe(listOf(topic))
        while (true) {
            val records = consumer.poll(1000)
            for (record in records) {
                println("Consumed message: ${record.value()} from partition ${record.partition()}")
            }
        }
    }

    fun close() {
        consumer.close()
    }
}

5. Ktor Server to Trigger Kafka Producer

Set up a Ktor server to trigger the Kafka producer from HTTP requests:

import io.ktor.application.*
import io.ktor.features.ContentNegotiation
import io.ktor.http.HttpStatusCode
import io.ktor.jackson.jackson
import io.ktor.request.receiveText
import io.ktor.response.respond
import io.ktor.routing.*
import io.ktor.server.engine.embeddedServer
import io.ktor.server.netty.Netty
import io.ktor.sessions.cookie
import io.ktor.sessions.sessions
import io.ktor.util.KtorExperimentalAPI

fun main() {
    embeddedServer(Netty, port = 8080) {
        install(ContentNegotiation) {
            jackson { }
        }
        routing {
            route("/kafka") {
                post("/send") {
                    val message = call.receiveText()
                    val producer = KafkaProducerService()
                    producer.sendMessage("my-topic", message)
                    producer.close()
                    call.respond(HttpStatusCode.OK, "Message sent")
                }
            }
        }
    }.start(wait = true)
}

This exposes a POST endpoint at /kafka/send where you can send a message to Kafka.

6. Running the Kafka Consumer

In another terminal, run the Kafka consumer:

fun main() {
    val consumerService = KafkaConsumerService()
    consumerService.consumeMessages("my-topic")
}

7. Testing the End-to-End Flow

  1. Start Kafka (if not already running).
  2. Start the Ktor application by running Application.kt.
  3. Start the Kafka consumer by running ConsumerMain.kt.
  4. Send a message to Kafka using Postman or curl:
curl -X POST http://localhost:8080/kafka/send -d "Hello Kafka"
  1. Observe the Consumer Output: The message should be consumed by the Kafka consumer.

Notes:

  • Kafka Producer: Sends a message to the my-topic Kafka topic.
  • Kafka Consumer: Continuously consumes messages from the my-topic Kafka topic.
  • Ktor Server: Acts as an API to trigger the Kafka producer.

You can expand this example by adding error handling, logging, and more advanced configurations.


🌟 Master Kotlin with Kotlin in Action, Second Edition ðŸŒŸ

Unlock the power of Kotlin and elevate your programming skills! This comprehensive guide offers practical tutorials, real-world examples, and the latest Kotlin features to help you build efficient, modern applications like a pro. 🚀

🔥 Exclusive Deal Alert! Save a massive 34% today! 🎉 Don’t miss out on this chance to learn Kotlin while saving big.

👇 Click now to grab your discount and start your Kotlin journey! 👇


👉 Claim Your 34% Discount Now!

Hurry—this offer won't last forever! ⏳

Popular posts from this blog

Learn Java 8 streams with an example - print odd/even numbers from Array and List

Java Stream API - How to convert List of objects to another List of objects using Java streams?

Registration and Login with Spring Boot + Spring Security + Thymeleaf

Java, Spring Boot Mini Project - Library Management System - Download

ReactJS, Spring Boot JWT Authentication Example

Top 5 Java ORM tools - 2024

Java - Blowfish Encryption and decryption Example

Spring boot video streaming example-HTML5

Google Cloud Storage + Spring Boot - File Upload, Download, and Delete