Testing Micronaut Kafka Listener using Testcontainers

  • Java
  • Micronaut
  • MySQL
  • Kafka
Get the code

In this guide, you will learn how to:

  • Create a Micronaut application with Kafka integration

  • Implement a Kafka Listener and persist data in MySQL database

  • Test the Kafka Listener using Testcontainers and Awaitility

  • Simplify testing with Micronaut Test Resources

Prerequisites

What we are going to achieve in this guide

We are going to create a Micronaut project with Kafka, Micronaut Data JPA, MySQL and Awaitility, where we implement a Kafka Listeners which receives an event payload and persists the event data in the database. Then we will test this Kafka Listener using the Testcontainers Kafka and MySQL modules in conjunction with Awaitility.

Getting Started

You can create a new Micronaut project from Micronaut Launch by selecting the kafka, data-jpa, mysql, awaitility, assertj, and testcontainers features.

We are going to use the Awaitility library for asserting the expectations of an asynchronous process flow.

If you have selected Gradle as the build tool, then the generated build.gradle file should have the following dependencies.

dependencies {
    annotationProcessor("io.micronaut.data:micronaut-data-processor")
    annotationProcessor("io.micronaut.serde:micronaut-serde-processor")
    implementation("io.micronaut.data:micronaut-data-hibernate-jpa")
    implementation("io.micronaut.kafka:micronaut-kafka")
    implementation("io.micronaut.serde:micronaut-serde-jackson")
    implementation("io.micronaut.sql:micronaut-jdbc-hikari")
    runtimeOnly("ch.qos.logback:logback-classic")
    runtimeOnly("mysql:mysql-connector-java")
    testImplementation("io.micronaut:micronaut-http-client")
    testImplementation("org.assertj:assertj-core")
    testImplementation("org.awaitility:awaitility:4.2.0")
    testImplementation("org.testcontainers:junit-jupiter")
    testImplementation("org.testcontainers:kafka")
    testImplementation("org.testcontainers:mysql")
    testImplementation("org.testcontainers:testcontainers")
}

We are going to implement a Kafka Listener listening to a topic named product-price-changes. Upon receiving a message, we are going to extract product code and price from the event payload and update the price of that product in the MySQL database.

Create JPA entity

First, let us start with creating a JPA entity Product.java.

package com.testcontainers.demo;

import jakarta.persistence.Column;
import jakarta.persistence.Entity;
import jakarta.persistence.GeneratedValue;
import jakarta.persistence.GenerationType;
import jakarta.persistence.Id;
import jakarta.persistence.Table;
import java.math.BigDecimal;

@Entity
@Table(name = "products")
public class Product {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    @Column(nullable = false, unique = true)
    private String code;

    @Column(nullable = false)
    private String name;

    @Column(nullable = false)
    private BigDecimal price;

    public Product() {}

    public Product(Long id, String code, String name, BigDecimal price) {
        this.id = id;
        this.code = code;
        this.name = name;
        this.price = price;
    }

    public Long getId() {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }

    public String getCode() {
        return code;
    }

    public void setCode(String code) {
        this.code = code;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public BigDecimal getPrice() {
        return price;
    }

    public void setPrice(BigDecimal price) {
        this.price = price;
    }
}

Create Micronaut Data JPA repository

Let us create a Micronaut Data JPA repository interface for the Product entity and add methods to find a product for a given code and update the price for the given product code as follows:

package com.testcontainers.demo;

import io.micronaut.data.annotation.Query;
import io.micronaut.data.annotation.Repository;
import io.micronaut.data.jpa.repository.JpaRepository;
import java.math.BigDecimal;
import java.util.Optional;

@Repository
public interface ProductRepository extends JpaRepository<Product, Long> {

    Optional<Product> findByCode(String code);

    @Query("update Product p set p.price = :price where p.code = :productCode")
    void updateProductPrice(String productCode, BigDecimal price);
}

Create the event payload java bean

Let us create a domain object named ProductPriceChangedEvent as a record representing the structure of the event payload that we are going to receive from the Kafka topic.

package com.testcontainers.demo;

import io.micronaut.serde.annotation.Serdeable;
import java.math.BigDecimal;

@Serdeable
public record ProductPriceChangedEvent(String productCode, BigDecimal price) {}

The @Serdeable annotation at the type level in your source code to allow the type to be serialized or deserialized.

Implement Kafka Listener

Finally, let us implement the Kafka listener which handles the messages received from the product-price-changes topic and updates the product price in the database.

To listen to Kafka messages, you can use the @KafkaListener annotation to define a message listener.

package com.testcontainers.demo;

import static io.micronaut.configuration.kafka.annotation.OffsetReset.EARLIEST;

import io.micronaut.configuration.kafka.annotation.KafkaListener;
import io.micronaut.configuration.kafka.annotation.Topic;
import jakarta.inject.Singleton;
import jakarta.transaction.Transactional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
@Transactional
class ProductPriceChangedEventHandler {

    private static final Logger LOG = LoggerFactory.getLogger(ProductPriceChangedEventHandler.class);

    private final ProductRepository productRepository;

    ProductPriceChangedEventHandler(ProductRepository productRepository) {
        this.productRepository = productRepository;
    }

    @Topic("product-price-changes")
    @KafkaListener(offsetReset = EARLIEST, groupId = "demo")
    public void handle(ProductPriceChangedEvent event) {
        LOG.info("Received a ProductPriceChangedEvent with productCode:{}: ", event.productCode());
        productRepository.updateProductPrice(event.productCode(), event.price());
    }
}
  • The @Topic annotation is again used to indicate which topic(s) to subscribe to.

  • The @KafkaListener is used with offsetReset set to EARLIEST which makes the listener start listening to messages from the beginning of the partition

Let us assume that there is an agreement between the sender and receiver that the payload will be sent in the following JSON format:

{
  "productCode": "P100",
  "price": 25.00
}

Write Test for Kafka Listener

We are going to write a test for the Kafka event listener ProductPriceChangedEventHandler by sending a message to the product-price-changes topic and verify the updated product price in the database.

But to successfully start our Micronaut context, we need Kafka and the MySQL database up and running and configure the Spring context to talk to them.

Create a @KafkaClient to simplify publishing events in the test.

package com.testcontainers.demo;

import io.micronaut.configuration.kafka.annotation.KafkaClient;
import io.micronaut.configuration.kafka.annotation.KafkaKey;
import io.micronaut.configuration.kafka.annotation.Topic;

@KafkaClient
public interface ProductPriceChangesClient {

    @Topic("product-price-changes")
    void send(@KafkaKey String productCode, ProductPriceChangedEvent event);
}
  • The @KafkaClient annotation is used to designate this interface as a client

  • The @Topic annotation indicates which topics the @KafkaClient should be published to.

  • The Kafka key can be specified by providing a parameter annotated with @KafkaKey. If no such parameter is specified, the record is sent with a null key.

We will use the Testcontainers library to spin up a Kafka and the MySQL database instances as Docker containers and configure the application to talk to them as follows:

package com.testcontainers.demo;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

import io.micronaut.context.annotation.Property;
import io.micronaut.core.annotation.NonNull;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import io.micronaut.test.support.TestPropertyProvider;
import java.math.BigDecimal;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;

@MicronautTest(transactional = false)
@Property(name = "datasources.default.driver-class-name", value = "org.testcontainers.jdbc.ContainerDatabaseDriver")
@Property(name = "datasources.default.url", value = "jdbc:tc:mysql:8.0.32:///db")
@Testcontainers(disabledWithoutDocker = true)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class ProductPriceChangedEventHandlerTest implements TestPropertyProvider {

    @Container 7
    static final KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.3.3"));

    @Override
    public @NonNull Map<String, String> getProperties() {
        if (!kafka.isRunning()) {
            kafka.start();
        }
        return Collections.singletonMap("kafka.bootstrap.servers", kafka.getBootstrapServers());
    }

    @Test
    void shouldHandleProductPriceChangedEvent(
            ProductPriceChangesClient productPriceChangesClient, ProductRepository productRepository) {
        Product product = new Product(null, "P100", "Product One", BigDecimal.TEN);
        Long id = productRepository.save(product).getId();

        ProductPriceChangedEvent event = new ProductPriceChangedEvent("P100", new BigDecimal("14.50"));

        productPriceChangesClient.send(event.productCode(), event);

        await().pollInterval(Duration.ofSeconds(3)).atMost(10, SECONDS).untilAsserted(() -> {
            Optional<Product> optionalProduct = productRepository.findByCode("P100");
            assertThat(optionalProduct).isPresent();
            assertThat(optionalProduct.get().getCode()).isEqualTo("P100");
            assertThat(optionalProduct.get().getPrice()).isEqualTo(new BigDecimal("14.50"));
        });

        productRepository.deleteById(id);
    }
}

Let us understand what is going on in this test.

  • Annotated the class with @MicronautTest so the Micronaut framework will initialize the application context and the embedded server. By default, each @Test method will be wrapped in a transaction that will be rolled back when the test finishes. This behaviour is changed by setting transaction to false.

  • Annotate the class with @Property to supply the driver class name configuration to the test.

  • We have configured the Testcontainers special JDBC URL to spin up a MySQL container and configure it as a DataSource with Micronaut application context.

  • Classes that implement TestPropertyProvider must use this annotation to create a single class instance for all tests (not necessary in Spock tests).

  • When you need dynamic properties definition, implement the TestPropertyProvider interface. Override the method .getProperties() and return the properties you want to expose to the application.

  • We have used the Testcontainers JUnit 5 Extension annotations @Testcontainers and @Container to spin up a Kafka container and registered the bootstrap-servers location using TestPropertyProvider mechanism.

  • We have created a Product record in the database before sending the event.

  • During the test, we sent a message to the product-price-changes topic using ProductPriceChangesClient with productCode as key and ProductPriceChangedEvent instance as value.

  • As Kafka message processing is an asynchronous process, we are using the Awaitility library to check whether the product price is updated in the database to the expected value or not with an interval of 3 seconds waiting up to a maximum of 10 seconds. If the message is consumed and processed within 10 seconds the test will pass, otherwise the test will fail.

Now you can run the tests.

Testing Kafka integration with Test Resources

Micronaut Test Resources adds support for managing external resources which are required during development or testing. Let’s see how we can simplify testing with Micronaut Test Resources.

  • You can remove Testcontainers dependencies from pom.xml or build.gradle. Micronaut Test Resources still uses Testcontainers under the hood.

  • Add the Micronaut Test Resources support.

If you are using Maven, then add the following dependency and set the property micronaut.test.resources.enabled to true in pom.xml.

<properties>
    ...
    <micronaut.test.resources.enabled>true</micronaut.test.resources.enabled>
</properties>

<dependencies>
    <dependency>
        <groupId>io.micronaut.testresources</groupId>
        <artifactId>micronaut-test-resources-client</artifactId>
        <scope>provided</scope>
    </dependency>
</dependencies>

If you are using Gradle, then add the io.micronaut.test-resources plugin in build.gradle.

plugins {
    ...
    id("io.micronaut.test-resources") version "4.0.3"
}

When the application is started locally, either under test or by running the application, resolution of the property kafka.bootstrap.servers is detected, and the Test Resources service will start a local Kafka docker container, and inject the properties required to use this as the broker.

Thanks to Test Resources, we can simplify the test as follows:

package com.testcontainers.demo;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import java.math.BigDecimal;
import java.time.Duration;
import java.util.Optional;
import org.junit.jupiter.api.Test;

@MicronautTest(transactional = false)
class ProductPriceChangedEventHandlerTestResourcesTest {

    @Test
    void shouldHandleProductPriceChangedEvent(
            ProductPriceChangesClient productPriceChangesClient, ProductRepository productRepository) {
        Product product = new Product(null, "P100", "Product One", BigDecimal.TEN);
        Long id = productRepository.save(product).getId();

        ProductPriceChangedEvent event = new ProductPriceChangedEvent("P100", new BigDecimal("14.50"));

        productPriceChangesClient.send(event.productCode(), event);

        await().pollInterval(Duration.ofSeconds(3)).atMost(10, SECONDS).untilAsserted(() -> {
            Optional<Product> optionalProduct = productRepository.findByCode("P100");
            assertThat(optionalProduct).isPresent();
            assertThat(optionalProduct.get().getCode()).isEqualTo("P100");
            assertThat(optionalProduct.get().getPrice()).isEqualTo(new BigDecimal("14.50"));
        });

        productRepository.deleteById(id);
    }
}

If you run the tests, you will see a MySQL container and Kafka container being started by Test Resources through integration with Testcontainers to provide throwaway containers for testing.

Run tests

# If you are using Maven
./mvnw test

# If you are using Gradle
./gradlew test

You should see the Kafka and MySQL Docker containers are started and all tests should PASS. You can also notice that after the tests are executed, the containers are stopped and removed automatically.

Summary

We have learned how to test Kafka message listeners using a real Kafka instance with Testcontainers and verified the expected result using Awaitility. If we are using Kafka and MySQL in production, it is often the best approach to test with real Kafka and MySQL instances in order to allow our test suite to provide us with more confidence about the correctness of our code.

To learn more about Testcontainers visit http://testcontainers.com