Part 8 - Producer/Consumer using Kafka Client

In this section, we are going to use kafka client api and see how to programmatically create, configure and use Kafka Cluster. Following are the main topic, that we are going to look at:

Dependency for Kafka Client

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.8.0</version>
</dependency>

Create Configuration

Kafka requires configuration in key/values pairs. And for that we can use java.util.Properites class. Following are required config for Kafka Producer:

Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9090,localhost:9091");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Run Kafka Instance

Although, there many ways to start the kafka instance, which are:

Dependency for TestContainers

<dependency>
    <groupId>org.testcontainers</groupId>
    <artifactId>kafka</artifactId>
    <version>${test-container.version}</version>
    <scope>test</scope>
</dependency>

<dependency>
    <groupId>org.testcontainers</groupId>
    <artifactId>junit-jupiter</artifactId>
    <version>${test-container.version}</version>
    <scope>test</scope>
</dependency>

Admin API Usage

Required config

Create Kafka Topic Programmatically

It will create the topic with default options. Note that kafkaFuter.get() will block the thread until topic creation has been completed or failed.

try (Admin admin = Admin.create(properties)) {
    int numOfPartitions = 1;
    short replicationFactor = 1;
    NewTopic newTopic = new NewTopic(topicName, numOfPartitions, replicationFactor);

    CreateTopicsResult topicsResult = admin.createTopics(Collections.singleton(newTopic));

    KafkaFuture<Void> kafkaFuture = topicsResult.values().get(topicName);
    kafkaFuture.get();
}

Kafka Producer

Required config

Implementation

try (KafkaProducer<String, String> producer = new KafkaProducer<>(properties)) {

        messages.entrySet()
        .stream()
        .map(p -> new ProducerRecord<>(TOPIC_NAME, p.getKey(), p.getValue()))
        .forEach(producer::send);

        } 

Kafka Consumer

Required config

Implementation

try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties)) {
    consumer.subscribe(List.of(TOPIC_NAME));

    while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10));
    records.forEach(KafkaConsumerApp::printRecord);
    }
}

Source of this section is available on GitHub.

Series