Part 6 - Kafka Consumer
This section describes the Kafka Consumer and its internals. For implementation, please visit the github.
Three Major Required Properties
- bootstrap.servers
- required for the consumer to start up
- used for discovering full membership of the cluster
- used for determining the partition leaders or owners
- key.deserializer
- value.deserializer
Here is the full list of consumer configs.
What subscription does?
- It will use automatic/dynamic partition assignment for topics
- Providing a single topic means pulling from every partition with that topic (one-to-many relationship)
- Providing multiple topics means pulling from every partition for all topics (many-to-many relationship)
Consumer Coordinator
- It’s aware of automatic or dynamic partition reassignment
- Push notification of assignment changes to the subscription state object
- It commits the offsets to the cluster
Offset
- The message that has been read, does not mean its also committed
- Configuration for Offset Gap
- enable.auto.commit = true (default)
- auto.commit.interval.ms = 5000 (default)
- auto.offset.reset = “latest” (default)
- “earliest”
- “none”
- Offset behavior varies between whether you are a single consumer or a consumer group topology
Offset Management
- There are two different ways to manage the offset commit: automatic/manual
- To use manual offset commit, we need to set
enable.auto.commit
config to false - There are two ways to commit offset in manual mode:
- commitSync
- It is a synchronous request which blocks until it receives the response from the cluster
- In case of error, we can use
retry.backoff.ms (default: 100)
config for retries until it succeeds or unrecoverable error
- commitAsync
- As the name suggests, it is an asynchronous request which a non-blocking but also a non-deterministic
- It doesn’t have retries functionality
- It also supports callback option which can help to process even further when the response comes from the cluster
- commitSync
Consumer Group
- It’s a Kafka solution to consumer side scale-out
group.id
config can be used for defining the consumer group- The main goal of the consumer group is to share the message consumption and processing load
- provides parallelism and high throughput
- increases the level of redundancy
- increases the performance of message processing
Group Coordinator
- It is a broker who is elected to serve as a group coordinator
- It monitors and maintains the consumer groups membership using
heartbeat.interval.ms
: 3000 (default)session.timeout.ms
: 10000 (default)
- It evenly balances available consumers to partitions
- It tries to assign one consumer to one partition of that topic but it varies on no of consumer and partitions
- It also initiates re-balancing protocol which will happen only if
- new partitions are added
- any consumer failure
Kafka Consumer Internals
- Properties ~> ConsumerConfig
- Message ~> ConsumerRecord
Some useful configuration
fetch.min.bytes (default: 1)
: minimum number of bytes that must be returned from the pollfetch.max.bytes (default: 52428800 - 50MB)
: maximum number of bytes that must be returned from the pollmax.fetch.wait.ms (default: 500)
: amount of time to wait if there isn’t enough data to meet above thresholdmax.partition.fetch.bytes (default: 1048576 - 1MB)
: maximum number of bytes to be returned per partition per cyclemax.poll.records (default: 500)
: maximum number of records allowed per poll cyclemax.poll.interval.ms (default: 300000 - 5min)
: maximum delay between invocations of pollallow.auto.create.topics (default: true)
: allows automatic topic creation
Some useful consumer control
- Consumer Position Control
- seek() ~> read any specific message by providing an offset
- seekToBeginning() ~> read message from the beginning
- seekToEnd() ~> read message from the end
- Flow Control
- pause() ~> pause the message for any partition of topic
- resume() ~> resume the message for any partition of topic
- Re-balance Listeners ~> listen for consumer group re-balancing