Post

Part 9 - Producer/Consumer using Spring Kafka

Part 9 - Producer/Consumer using Spring Kafka

Dependency

1
2
3
4
5
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.7.2</version>
</dependency>

Configuring KafkaAdmin

We need the KafkaAdmin bean for creating a new topic. It just delegates to an AdminClient for creating and describing the topic. It looks for NewTopic bean in the application context during its initialization and wraps it inside NewTopics list.

1
2
3
4
5
6
7
8
9
10
11
@Bean
public KafkaAdmin kafkaAdmin() {
    Map<String, Object> configs = new HashMap<>();
    configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddresses);
    return new KafkaAdmin(configs);
}

@Bean
public NewTopic myTopic() {
    return new NewTopic("myTopic", 1, (short) 1);
}

Configuring ProducerFactory and KafkaTemplate

We need the ProducerFactory bean for creating a producer. It is a factory class for creating Producer instances. And default implementation is provided by DefaultKafkaProducerFactory.

We will also need the KafkaTemplate bean, for sending the messages. It provides a high-level operations like, send the messages which are thread-safe, when used with DefaultKafkaProducerFactory.

1
2
3
4
5
6
7
8
9
10
11
12
13
@Bean
public ProducerFactory<String, String> producerFactory() {
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
}

Configuring ConsumerFactory and KafkaListenerContainerFactory

We need the ConsumerFactory bean for creating a consumer. It is a factory class for creating Consumer instances. And default implementation is provided by DefaultKafkaConsumerFactory.

ConcurrentKafkaListenerContainerFactory is the subclass of KafkaListenerContainerFactory which looks for @KafkaListener annotated methods and are added to the KafkaListenerEndpointRegistry.

When messages are received from Kafka Broker, @KafkaListener annotated methods are triggered.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Bean
public ConsumerFactory<String, String> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    return new DefaultKafkaConsumerFactory<>(props);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    return factory;
}

Note: @EnableKafka is required in the configuration class which enables detection of KafkaListener annotations on any spring-managed bean in the container.

How to run

I have implemented spring kafka using two different approaches:

  • By manually configuring the required bean for kafka
  • Using auto-configuration provided by spring-kafka

Although both of them are managed using the spring profile: [auto, manual], but does the same thing.

For implementation, please visit the github.

For Spring Cloud Stream with Kafka implementation, please visit the github here.

Reference

Kafka Series

This post is licensed under CC BY 4.0 by the author.