Post

Part 8 - Producer/Consumer using Kafka Client

Part 8 - Producer/Consumer using Kafka Client

In this section, we are going to use kafka client api and see how to programmatically create, configure and use Kafka Cluster. Following are the main topic, that we are going to look at:

  • Use Admin API to create topics
  • Send the message using Producer
  • Receive the message using Consumer

Dependency for Kafka Client

1
2
3
4
5
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.8.0</version>
</dependency>

Create Configuration

Kafka requires configuration in key/values pairs. And for that we can use java.util.Properites class. Following are required config for Kafka Producer:

1
2
3
4
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9090,localhost:9091");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Run Kafka Instance

Although, there many ways to start the kafka instance, which are:

Dependency for TestContainers

1
2
3
4
5
6
7
8
9
10
11
12
13
<dependency>
    <groupId>org.testcontainers</groupId>
    <artifactId>kafka</artifactId>
    <version>${test-container.version}</version>
    <scope>test</scope>
</dependency>

<dependency>
    <groupId>org.testcontainers</groupId>
    <artifactId>junit-jupiter</artifactId>
    <version>${test-container.version}</version>
    <scope>test</scope>
</dependency>

Admin API Usage

Required config

  • bootstrap.servers

Create Kafka Topic Programmatically

It will create the topic with default options. Note that kafkaFuter.get() will block the thread until topic creation has been completed or failed.

1
2
3
4
5
6
7
8
9
10
try (Admin admin = Admin.create(properties)) {
    int numOfPartitions = 1;
    short replicationFactor = 1;
    NewTopic newTopic = new NewTopic(topicName, numOfPartitions, replicationFactor);

    CreateTopicsResult topicsResult = admin.createTopics(Collections.singleton(newTopic));

    KafkaFuture<Void> kafkaFuture = topicsResult.values().get(topicName);
    kafkaFuture.get();
}

Kafka Producer

Required config

  • bootstrap.servers
  • key.serializer
  • value.serializer

Implementation

1
2
3
4
5
6
7
8
try (KafkaProducer<String, String> producer = new KafkaProducer<>(properties)) {

        messages.entrySet()
        .stream()
        .map(p -> new ProducerRecord<>(TOPIC_NAME, p.getKey(), p.getValue()))
        .forEach(producer::send);

        } 

Kafka Consumer

Required config

  • bootstrap.servers
  • key.deserializer
  • value.deserializer
  • group.id

Implementation

1
2
3
4
5
6
7
8
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties)) {
    consumer.subscribe(List.of(TOPIC_NAME));

    while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10));
    records.forEach(KafkaConsumerApp::printRecord);
    }
}

Source of this section is available on GitHub.

Kafka Series

This post is licensed under CC BY 4.0 by the author.