Confluent
How to Work with Apache Kafka in Your Spring Boot Application
Frameworks

How to Work with Apache Kafka in Your Spring Boot Application

Igor Kosandyak

Choosing the right messaging system during your architectural planning is always a challenge, yet one of the most important considerations to nail. As a developer, I write applications daily that need to serve lots of users and process huge amounts of data in real time.

Usually, I use Java with the Spring Framework (Spring Boot, Spring Data, Spring Cloud, Spring Caching, etc.) for this. Spring Boot is a framework that allows me to go through my development process much faster and easier than before. It has come to play a crucial role in my organization. As the number of our users quickly grew, we realized our apparent need for something that could process as many as 1,000,000 events per second.

When we found Apache Kafka®, we saw that it met our needs and could handle millions of messages quickly. That’s why we decided to try it. And since that moment, Kafka has been a vital tool in my pocket. Why did I choose it, you ask?

Apache Kafka is:

  • Skalierbar
  • Fault tolerant
  • A great publish-subscribe messaging system
  • Capable of higher throughput compared with most messaging systems
  • Highly durable
  • Highly reliable
  • High performant

That’s why I decided to use it in my projects. Based on my experience, I provide here a step-by-step guide on how to include Apache Kafka in your Spring Boot application so that you can start leveraging its benefits too.

Prerequisites

  • This article requires you to have Confluent Platform
  • Manual install using ZIP and TAR archives
    • Download
    • Unzip it
    • Follow the step-by-step instructions, and you’ll get Kafka up and running in your local environment

I recommend using the Confluent CLI for your development to have Apache Kafka and other components of an event streaming platform up and running.

What you’ll get out of this guide

After reading this guide, you will have a Spring Boot application with a Kafka producer to publish messages to your Kafka topic, as well as with a Kafka consumer to read those messages.

And with that, let’s get started!

Table of contents

Step 1: Generate our project
Step 2: Publish/read messages from the Kafka topic
Step 3: Configure Kafka through application.yml configuration file
Step 4: Create a producer
Step 5: Create a consumer
Step 6: Create a REST controller

Step 1: Generate our project

First, let’s go to Spring Initializr to generate our project. Our project will have Spring MVC/web support and Apache Kafka support.

Generate a Spring Boot project with Spring Initializr

Once you have unzipped the project, you’ll have a very simple structure. I’ll show you how the project will look like at the end of this article so you can easily follow the same structure. I’m going to use Intellij IDEA, but you can use any Java IDE.

Step 2: Publish/read messages from the Kafka topic

Now, you can see what it looks like. Let’s move on to publishing/reading messages from the Kafka topic.

Start by creating a simple Java class, which we will use for our example: package com.demo.models;

public class User {

    private String name;
    private int age;

    public User(String name, int age) {
        this.name = name;
        this.age = age;
    }
}

Spring Boot with Kafka – Intellij IDEA

Step 3: Configure Kafka through application.yml configuration file

Next, we need to create the configuration file. We need to somehow configure our Kafka producer and consumer to be able to publish and read messages to and from the topic. Instead of creating a Java class, marking it with @Configuration annotation, we can use either application.properties file or application.yml. Spring Boot allows us to avoid all the boilerplate code we used to write in the past, and provide us with much more intelligent way of configuring our application, like this:

server: port: 9000
spring:
   kafka:
     consumer:
        bootstrap-servers: localhost:9092
        group-id: group_id
        auto-offset-reset: earliest
        key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
     producer:
        bootstrap-servers: localhost:9092
        key-serializer: org.apache.kafka.common.serialization.StringSerializer
        value-serializer: org.apache.kafka.common.serialization.StringSerializer

If you want to get more about Spring Boot auto-configuration, you can read this short and useful article. For a full list of available configuration properties, you can refer to the official documentation.

Step 4: Create a producer

Creating a producer will write our messages to the topic.

@Service
public class Producer {

    private static final Logger logger = LoggerFactory.getLogger(Producer.class);
    private static final String TOPIC = "users";

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String message) {
        logger.info(String.format("#### -> Producing message -> %s", message));
        this.kafkaTemplate.send(TOPIC, message);
    }
}

We just auto-wired KafkaTemplate and will use this instance to publish messages to the topic—that’s it for producer!

Step 5: Create a consumer

Consumer is  the service that will be responsible for reading messages processing them according to the needs of your own business logic. To set it up, enter the following:

@Service
public class Consumer {

    private final Logger logger = LoggerFactory.getLogger(Producer.class);

    @KafkaListener(topics = "users", groupId = "group_id")
    public void consume(String message) throws IOException {
        logger.info(String.format("#### -> Consumed message -> %s", message));
    }
}

Here, we told our method void consume (String message) to subscribe to the user’s topic and just emit every message to the application log. In your real application, you can handle messages the way your business requires you to.

Step 6: Create a REST controller

If we already have a consumer, then we already have all we need to be able to consume Kafka messages.

To fully show how everything that we created works, we need to create a controller with single endpoint. The message will be published to this endpoint, and then handled by our producer.

Then, our consumer will catch and handle it the way we set it up  by logging to the console.

@RestController
@RequestMapping(value = "/kafka")
public class KafkaController {

    private final Producer producer;

    @Autowired
    KafkaController(Producer producer) {
        this.producer = producer;
    }

    @PostMapping(value = "/publish")
    public void sendMessageToKafkaTopic(@RequestParam("message") String message) {
        this.producer.sendMessage(message);
    }
}

Let’s send our message to Kafka using cURL:

curl -X POST -F 'message=test' http://localhost:9000/kafka/publish

Add Apache Kafka to your Spring Boot Project

Basically, that’s it! In fewer than 10 steps, you learned how easy it is to add Apache Kafka to your Spring Boot project. If you followed this guide, you now know how to integrate Kafka into your Spring Boot project, and you are ready to go with this super tool!

Interested in more?

If you’d like to know more, you can download the Confluent Platform, the leading distribution of Apache Kafka. You can also find all the code in this article on GitHub.

This is a guest post by Igor Kosandyak, a Java software engineer at Oril, with extensive experience in various development areas.

Subscribe to the Confluent Blog

Abonnieren

More Articles Like This

Connecting to Kafka on Docker
Robin Moffatt

Kafka Listeners – Explained

Robin Moffatt .

This question comes up on Stack Overflow and such places a lot, so here’s something to try and help. tl;dr: You need to set advertised.listeners (or KAFKA_ADVERTISED_LISTENERS if you’re using ...

KSQL Tutorial Components
Mark Plascencia

How to Connect KSQL to Confluent Cloud using Kubernetes with Helm

Mark Plascencia .

Confluent Cloud, a fully managed event cloud-native streaming service that extends the value of Apache Kafka®, is simple, resilient, secure, and performant, allowing you to focus on what is important—building ...

Event Streaming Pipeline
Ilayaperumal Gopinathan

Spring for Apache Kafka Deep Dive – Part 4: Continuous Delivery of Event Streaming Pipelines

Ilayaperumal Gopinathan .

For event streaming application developers, it is important to continuously update the streaming pipeline based on the need for changes in the individual applications in the pipeline. It is also ...

Leave a Reply

Your email address will not be published. Required fields are marked *

Comments

  1. Hi Igor,

    I’ve followed the guide step by step and also cloned the project from GitHub and when I run it (mvn spring-boot:run), it outputs continuosly the following:

    2019-01-14 14:46:23.723 WARN 9040 — [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-1, groupId=group_id] Connection to node -1 could not be established. Broker may not be available.
    2019-01-14 14:46:24.104 WARN 9040 — [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Connection to node -1 could not be established. Broker may not be available.

    Then it just crashes.

    Is there anything missing from the guide/code?

    Thanks!

    1. Hi Daniel! Thanks for reading!
      First thing I need to suggest is to check if you followed correctly the guide of Confluent CLI installation. This is the main prerequisite for correct working. So I would suggest to make sure it is installed and configured properly.
      Thank you!

  2. I try the blog but it looks the consumer does not work :

    2019-01-29 03:49:52.465 ERROR 521 — [nio-9000-exec-1] o.s.k.support.LoggingProducerListener : Exception thrown when sending a message with key=’null’ and payload=’test’ to topic users:

    org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.

    Also from the kafka-console-consumer.sh , there is no answer on the topic of users:

    ./kafka-console-consumer.sh –zookeeper localhost:2181 –topic users –from-beginning

    My environment is ubuntu16.04 on Windows10 with kafka_2.11-0.9.0.1.tgz /zookeeper-3.4.10.tar.gz

  3. Hi Igor,

    Great post. Looking at your configuration, I want to know if Is it possible to have more than one {group_id} ? If yes, how will that be configured.

    Cheers!

Try Confluent Platform

Download Now

Wir verwenden Cookies, damit wir nachvollziehen können, wie Sie unsere Website verwenden, und um Ihr Erlebnis zu optimieren. Klicken Sie hier, wenn Sie mehr erfahren oder Ihre Cookie-Einstellungen ändern möchten. Wenn Sie weiter auf dieser Website surfen, stimmen Sie unserer Nutzung von Cookies zu.