Unit Testing Kafka

Blog Summary: (AI Summaries by Summarizes)
  • Unit testing Kafka code is crucial as it transports important data.
  • As of version 0.9.0, there is a new way to unit test with mock objects.
  • To refactor the producer, change the `Producer` to an interface and create a separate method for creating the `KafkaProducer`.
  • Refactor the code that sends data to the `Producer` object so that it can be called from the unit test.
  • Use the `MockProducer` object for Kafka unit tests of the `Producer` code.

Unit testing your Kafka code is incredibly important. It’s transporting your most important data. As of 0.9.0 there’s a new way to unit test with mock objects.

Refactoring Your Producer

First of all, you’ll need to be able to change your Producer at runtime. Instead of using the KafkaProducer object directly, you’ll use the Producer interface.

public Producer<String, String> producer;

You can use whichever method for dependency injection, but I’m making the Producer public so I can change it from the unit test.

Next, you’ll want to refactor the code for creating your KafkaProducer. The creation of the KafkaProducer should be in separate method that won’t get called by your production Producer code.

You’ll also need to refactor the code that sends to the data to the Producer object. This code will need to be callable from the unit test.

Unit Testing Your Producer

Kafka unit tests of the Producer code use MockProducer object. The @Before will initialize the MockProducer before each test.

MockProducer<String, String> producer;

@Before
public void setUp() {
    producer = new MockProducer<String, String>(
            true, new StringSerializer(), new StringSerializer());
}

Have you been searching for the best data engineering training? You’ve found it. Sign up for my list so you can get my Professional Data Engineering course.

Once we’ve set the objects up, we can start testing.

@Test
public void testProducer() throws IOException {
    MyTestKafkaProducer myTestKafkaProducer = new MyTestKafkaProducer();
    myTestKafkaProducer.producer = producer;

    myTestKafkaProducer.send();

    List<ProducerRecord<String, String>> history = producer.history();

    List<ProducerRecord<String, String>> expected = Arrays.asList(
            new ProducerRecord<String, String>("my_topic", "mykey", "myvalue0"),
            new ProducerRecord<String, String>("my_topic", "mykey", "myvalue1"),
            new ProducerRecord<String, String>("my_topic", "mykey", "myvalue2"),
            new ProducerRecord<String, String>("my_topic", "mykey", "myvalue3"),
            new ProducerRecord<String, String>("my_topic", "mykey", "myvalue4"));

    Assert.assertEquals("Sent didn't match expected", expected, history);
}

We start off by instantiating the Producer we’re wanting to test. We inject our MockProducer into the Producer. We send some data with the Producer. All of the data sent by the Producer can be accessed with the history() method. We create a list of the ProducerRecords we expected. Finally, we can assert that the two lists match each other.

In the next post, I show you how to unit test a KafkaConsumer.

Full producer test code:

import java.io.IOException;
import java.util.Arrays;
import java.util.List;

import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import kafka.MyTestKafkaProducer;

public class ProducerTest {
    MockProducer<String, String> producer;

    @Before
    public void setUp() {
        producer = new MockProducer<String, String>(
            true, new StringSerializer(), new StringSerializer());
    }

    @Test
    public void testProducer() throws IOException {
        MyTestKafkaProducer myTestKafkaProducer = new MyTestKafkaProducer();
        myTestKafkaProducer.producer = producer;

        myTestKafkaProducer.send();

        List<ProducerRecord<String, String>> history = producer.history();

        List<ProducerRecord<String, String>> expected = Arrays.asList(
                    new ProducerRecord<String, String>("my_topic", "mykey", "myvalue0"),
                    new ProducerRecord<String, String>("my_topic", "mykey", "myvalue1"),
                    new ProducerRecord<String, String>("my_topic", "mykey", "myvalue2"),
                    new ProducerRecord<String, String>("my_topic", "mykey", "myvalue3"),
                    new ProducerRecord<String, String>("my_topic", "mykey", "myvalue4"));

        Assert.assertEquals("Sent didn't match expected", expected, history);
    }
}

Related Posts

Data Teams Survey 2020-2024 Analysis

Blog Summary: (AI Summaries by Summarizes)**Total Value Creation**:**Gradual Decrease in Value Creation**:**Team Makeup and Descriptions**:**Methodologies**:**Advice**:Frequently Asked Questions (AI FAQ by Summarizes)

Data Teams Survey 2024 Results

Blog Summary: (AI Summaries by Summarizes)Companies are not fully utilizing LLMs in data engineering, with 24.7% of teams not using them at all.Only 12% of