🚨 AtomicJar is now part of Docker 🐋! Read the blog

Testing Spring Boot Kafka Listener using Testcontainers

  • Java
  • Spring Boot
  • MySQL
  • Kafka
Get the code

In this guide you will learn how to

  • Create a Spring Boot application with Kafka integration

  • Implement a Kafka Listener and persist data in MySQL database

  • Test the Kafka Listener using Testcontainers and Awaitility

Prerequisites

What we are going to achieve in this guide

We are going to create a Spring Boot project with Kafka, Spring Data JPA and MySQL, where we implement a Kafka Listeners which receives an event payload and persists the event data in the database. Then we will test this Kafka Listener using the Testcontainers Kafka and MySQL modules in conjunction with Awaitility.

Getting Started

You can create a new Spring Boot project from Spring Initializr by selecting the Spring for Apache Kafka, Spring Data JPA, MySQL Driver and Testcontainers starters.

Once the application is generated add the Awaitility library as test dependency which we can use for asserting the expectations of an asynchronous process flow.

If you have selected Gradle as the build tool, then the generated build.gradle file should have the following dependencies.

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
    implementation 'org.springframework.kafka:spring-kafka'
    runtimeOnly 'com.mysql:mysql-connector-j'

    testImplementation 'org.springframework.boot:spring-boot-starter-test'
    testImplementation 'org.testcontainers:junit-jupiter'
    testImplementation 'org.springframework.kafka:spring-kafka-test'
    testImplementation 'org.testcontainers:junit-jupiter'
    testImplementation 'org.testcontainers:kafka'
    testImplementation 'org.testcontainers:mysql'
    testImplementation 'org.awaitility:awaitility'
}

We are going to implement a Kafka Listener listening to a topic named product-price-changes and upon receiving a message we are going to extract product code and price from the event payload and update the price of that product in the MySQL database.

Create JPA entity

First let us start with creating a JPA entity Product.java.

package com.testcontainers.demo;

import jakarta.persistence.Column;
import jakarta.persistence.Entity;
import jakarta.persistence.GeneratedValue;
import jakarta.persistence.GenerationType;
import jakarta.persistence.Id;
import jakarta.persistence.Table;
import java.math.BigDecimal;

@Entity
@Table(name = "products")
class Product {

  @Id
  @GeneratedValue(strategy = GenerationType.IDENTITY)
  private Long id;

  @Column(nullable = false, unique = true)
  private String code;

  @Column(nullable = false)
  private String name;

  @Column(nullable = false)
  private BigDecimal price;

  public Product() {}

  public Product(Long id, String code, String name, BigDecimal price) {
    this.id = id;
    this.code = code;
    this.name = name;
    this.price = price;
  }

  public Long getId() {
    return id;
  }

  public void setId(Long id) {
    this.id = id;
  }

  public String getCode() {
    return code;
  }

  public void setCode(String code) {
    this.code = code;
  }

  public String getName() {
    return name;
  }

  public void setName(String name) {
    this.name = name;
  }

  public BigDecimal getPrice() {
    return price;
  }

  public void setPrice(BigDecimal price) {
    this.price = price;
  }
}

Create Spring Data JPA repository

Let us create a Spring Data JPA repository interface for the Product entity and add methods to find a product for a given code and update the price for the given product code as follows:

package com.testcontainers.demo;

import java.math.BigDecimal;
import java.util.Optional;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;

interface ProductRepository extends JpaRepository<Product, Long> {
  Optional<Product> findByCode(String code);

  @Modifying
  @Query("update Product p set p.price = :price where p.code = :productCode")
  void updateProductPrice(
    @Param("productCode") String productCode,
    @Param("price") BigDecimal price
  );
}

Add a schema creation script

As we are not using any in-memory database, we need to create the MySQL database tables by some means. The recommended approach is to use some database migration tool like Flyway or Liquibase, but for this guide we will use simple schema initialization support provided by Spring Boot.

Create a schema.sql file with the following content under the src/main/resources directory.

create table products (
      id int NOT NULL AUTO_INCREMENT,
      code varchar(255) not null,
      name varchar(255) not null,
      price numeric(5,2) not null,
      PRIMARY KEY (id),
      UNIQUE (code)
);

We also need to enable schema initialization by adding the following property in the src/main/resources/application.properties file.

spring.sql.init.mode=always

Create the event payload java bean

Let us create a domain object named ProductPriceChangedEvent as a record representing the structure of the event payload that we are going to receive from the Kafka topic.

package com.testcontainers.demo;

import java.math.BigDecimal;

record ProductPriceChangedEvent(String productCode, BigDecimal price) {}

Implement Kafka Listener

Finally, let us implement the Kafka listener which handles the messages received from the product-price-changes topic and updates the product price in the database.

package com.testcontainers.demo;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

@Component
@Transactional
class ProductPriceChangedEventHandler {

  private static final Logger log = LoggerFactory.getLogger(
    ProductPriceChangedEventHandler.class
  );

  private final ProductRepository productRepository;

  ProductPriceChangedEventHandler(ProductRepository productRepository) {
    this.productRepository = productRepository;
  }

  @KafkaListener(topics = "product-price-changes", groupId = "demo")
  public void handle(ProductPriceChangedEvent event) {
    log.info(
      "Received a ProductPriceChangedEvent with productCode:{}: ",
      event.productCode()
    );
    productRepository.updateProductPrice(event.productCode(), event.price());
  }
}

We have implemented a Kafka event listener using a @KafkaListener annotated method specifying the topic name to listen to.

Let us assume that there is an agreement between the sender and receiver that the payload will be sent in the following JSON format:

{
  "productCode": "P100",
  "price": 25.00
}

To let the Spring Kafka integration handle the serialization and deserialization of key and values, configure the following properties in src/main/resources/application.properties file.

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

spring.kafka.consumer.group-id=demo
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=com.testcontainers.demo

We are going to use productCode as key which will be (de)serialized using StringSerializer/StringDeserializer and ProductPriceChangedEvent as value which will be (de)serialized using JsonSerializer/JsonDeserializer.

Write Test for Kafka Listener

We are going to write a test for the Kafka event listener ProductPriceChangedEventHandler by sending a message to the product-price-changes topic and verify the updated product price in the database.

But in order to successfully start our Spring context we need Kafka and the MySQL database up and running and configure the Spring context to talk to them.

We will use the Testcontainers library to spin up a Kafka and the MySQL database instances as Docker containers and configure the application to talk to them as follows:

package com.testcontainers.demo;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

import java.math.BigDecimal;
import java.time.Duration;
import java.util.Optional;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import org.springframework.test.context.TestPropertySource;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;

@SpringBootTest
@TestPropertySource(
  properties = {
    "spring.kafka.consumer.auto-offset-reset=earliest",
    "spring.datasource.url=jdbc:tc:mysql:8.0.32:///db",
  }
)
@Testcontainers
class ProductPriceChangedEventHandlerTest {

  @Container
  static final KafkaContainer kafka = new KafkaContainer(
    DockerImageName.parse("confluentinc/cp-kafka:7.3.3")
  );

  @DynamicPropertySource
  static void overrideProperties(DynamicPropertyRegistry registry) {
    registry.add("spring.kafka.bootstrap-servers", kafka::getBootstrapServers);
  }

  @Autowired
  private KafkaTemplate<String, Object> kafkaTemplate;

  @Autowired
  private ProductRepository productRepository;

  @BeforeEach
  void setUp() {
    Product product = new Product(null, "P100", "Product One", BigDecimal.TEN);
    productRepository.save(product);
  }

  @Test
  void shouldHandleProductPriceChangedEvent() {
    ProductPriceChangedEvent event = new ProductPriceChangedEvent(
      "P100",
      new BigDecimal("14.50")
    );

    kafkaTemplate.send("product-price-changes", event.productCode(), event);

    await()
      .pollInterval(Duration.ofSeconds(3))
      .atMost(10, SECONDS)
      .untilAsserted(() -> {
        Optional<Product> optionalProduct = productRepository.findByCode(
          "P100"
        );
        assertThat(optionalProduct).isPresent();
        assertThat(optionalProduct.get().getCode()).isEqualTo("P100");
        assertThat(optionalProduct.get().getPrice())
          .isEqualTo(new BigDecimal("14.50"));
      });
  }
}

Let us understand what is going on in this test.

  • We have annotated the test class with the @SpringBootTest annotation to write an integration test which loads the complete Spring application context.

  • We have configured the Testcontainers special JDBC URL to spin up a MySQL container and configure it as a DataSource with Spring Boot application context.

  • We have used the Testcontainers JUnit 5 Extension annotations @Testcontainers and @Container to spin up a Kafka container and registered the bootstrap-servers location using DynamicPropertySource mechanism.

  • We have created a Product record in the database before running the test using the @BeforeEach callback method.

  • During the test, we sent a message to the product-price-changes topic using KafkaTemplate with productCode as key and ProductPriceChangedEvent instance as value. Spring Boot will take care of converting the object into JSON using JsonSerializer.

  • As Kafka message processing is an asynchronous process, we are using the Awaitility library to check whether the product price is updated in the database to the expected value or not with an interval of 3 seconds waiting up to a maximum of 10 seconds. If the message is consumed and processed within 10 seconds the test will pass, otherwise the test will fail.

  • Also, notice that we have configured the property spring.kafka.consumer.auto-offset-reset to earliest so that the listener will consume the messages even if the message is sent to the topic before the listener is ready. This setting is helpful for running tests.

Run tests

# If you are using Maven
./mvnw test

# If you are using Gradle
./gradlew test

You should see the Kafka and MySQL Docker containers are started and all tests should PASS. You can also notice that after the tests are executed the containers are stopped and removed automatically.

Summary

We have learned how to test Kafka message listeners using a real Kafka instance with Testcontainers and verified the expected result using Awaitility. If we are using Kafka and MySQL in production, it is often the best approach to test with real Kafka and MySQL instances in order to allow our test suite to provide us with more confidence about the correctness of our code.

To learn more about Testcontainers visit http://testcontainers.com