Part 8 - Producer/Consumer using Kafka Client
In this section, we are going to use kafka client api
and see how to programmatically create, configure and use Kafka Cluster.
Following are the main topic, that we are going to look at:
- Use Admin API to create topics
- Send the message using Producer
- Receive the message using Consumer
Dependency for Kafka Client
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
Create Configuration
Kafka requires configuration in key/values pairs. And for that we can use java.util.Properites
class. Following are required config for Kafka Producer:
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9090,localhost:9091");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Run Kafka Instance
Although, there many ways to start the kafka instance, which are:
- Using kafka single instance
- Using kafka multiple instances
- Using docker-compose
- Using TestContainers for the test case
Dependency for TestContainers
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<version>${test-container.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<version>${test-container.version}</version>
<scope>test</scope>
</dependency>
Admin API Usage
Required config
- bootstrap.servers
Create Kafka Topic Programmatically
It will create the topic with default options. Note that kafkaFuter.get()
will block the thread
until topic creation has been completed or failed.
try (Admin admin = Admin.create(properties)) {
int numOfPartitions = 1;
short replicationFactor = 1;
NewTopic newTopic = new NewTopic(topicName, numOfPartitions, replicationFactor);
CreateTopicsResult topicsResult = admin.createTopics(Collections.singleton(newTopic));
KafkaFuture<Void> kafkaFuture = topicsResult.values().get(topicName);
kafkaFuture.get();
}
Kafka Producer
Required config
- bootstrap.servers
- key.serializer
- value.serializer
Implementation
try (KafkaProducer<String, String> producer = new KafkaProducer<>(properties)) {
messages.entrySet()
.stream()
.map(p -> new ProducerRecord<>(TOPIC_NAME, p.getKey(), p.getValue()))
.forEach(producer::send);
}
Kafka Consumer
Required config
- bootstrap.servers
- key.deserializer
- value.deserializer
- group.id
Implementation
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties)) {
consumer.subscribe(List.of(TOPIC_NAME));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10));
records.forEach(KafkaConsumerApp::printRecord);
}
}
Source of this section is available on GitHub.