- Unit testing Kafka code is crucial as it transports important data.
- As of version 0.9.0, there is a new way to unit test with mock objects.
- To refactor the producer, change the `Producer` to an interface and create a separate method for creating the `KafkaProducer`.
- Refactor the code that sends data to the `Producer` object so that it can be called from the unit test.
- Use the `MockProducer` object for Kafka unit tests of the `Producer` code.
Unit testing your Kafka code is incredibly important. It’s transporting your most important data. As of 0.9.0 there’s a new way to unit test with mock objects.
Refactoring Your Producer
First of all, you’ll need to be able to change your Producer
at runtime. Instead of using the KafkaProducer
object directly, you’ll use the Producer
interface.
public Producer<String, String> producer;
You can use whichever method for dependency injection, but I’m making the Producer
public so I can change it from the unit test.
Next, you’ll want to refactor the code for creating your KafkaProducer
. The creation of the KafkaProducer
should be in separate method that won’t get called by your production Producer
code.
You’ll also need to refactor the code that sends to the data to the Producer
object. This code will need to be callable from the unit test.
Unit Testing Your Producer
Kafka unit tests of the Producer
code use MockProducer
object. The @Before
will initialize the MockProducer
before each test.
MockProducer<String, String> producer;
@Before
public void setUp() {
producer = new MockProducer<String, String>(
true, new StringSerializer(), new StringSerializer());
}
Have you been searching for the best data engineering training? You’ve found it. Sign up for my list so you can get my Professional Data Engineering course.
Once we’ve set the objects up, we can start testing.
@Test
public void testProducer() throws IOException {
MyTestKafkaProducer myTestKafkaProducer = new MyTestKafkaProducer();
myTestKafkaProducer.producer = producer;
myTestKafkaProducer.send();
List<ProducerRecord<String, String>> history = producer.history();
List<ProducerRecord<String, String>> expected = Arrays.asList(
new ProducerRecord<String, String>("my_topic", "mykey", "myvalue0"),
new ProducerRecord<String, String>("my_topic", "mykey", "myvalue1"),
new ProducerRecord<String, String>("my_topic", "mykey", "myvalue2"),
new ProducerRecord<String, String>("my_topic", "mykey", "myvalue3"),
new ProducerRecord<String, String>("my_topic", "mykey", "myvalue4"));
Assert.assertEquals("Sent didn't match expected", expected, history);
}
We start off by instantiating the Producer
we’re wanting to test. We inject our MockProducer
into the Producer
. We send some data with the Producer
. All of the data sent by the Producer
can be accessed with the history()
method. We create a list of the ProducerRecord
s we expected. Finally, we can assert that the two lists match each other.
In the next post, I show you how to unit test a KafkaConsumer
.
Full producer test code:
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import kafka.MyTestKafkaProducer;
public class ProducerTest {
MockProducer<String, String> producer;
@Before
public void setUp() {
producer = new MockProducer<String, String>(
true, new StringSerializer(), new StringSerializer());
}
@Test
public void testProducer() throws IOException {
MyTestKafkaProducer myTestKafkaProducer = new MyTestKafkaProducer();
myTestKafkaProducer.producer = producer;
myTestKafkaProducer.send();
List<ProducerRecord<String, String>> history = producer.history();
List<ProducerRecord<String, String>> expected = Arrays.asList(
new ProducerRecord<String, String>("my_topic", "mykey", "myvalue0"),
new ProducerRecord<String, String>("my_topic", "mykey", "myvalue1"),
new ProducerRecord<String, String>("my_topic", "mykey", "myvalue2"),
new ProducerRecord<String, String>("my_topic", "mykey", "myvalue3"),
new ProducerRecord<String, String>("my_topic", "mykey", "myvalue4"));
Assert.assertEquals("Sent didn't match expected", expected, history);
}
}