How to Use Apache Kafka with Java?
We need to go over some fundamental ideas before diving into the coding. Each topic in Kafka is broken down into a collection of logs called a partition. These logs have producers writing at the tail, and consumers read the logs at their own pace. Kafka scales topic consumption by dividing up partitions among a group of consumers, or a collection of consumers with a single shared group identity.
A single topic with three partitions and a consumer group with two members are depicted in the diagram below.

The former consumer relied on Zookeeper for group management, whereas the new consumer relies on a group coordination mechanism built into Kafka. One of the brokers is chosen as the group organizer for each group.
When a group is first created, the consumers normally start reading from the earliest or latest offset in each partition. Messages in each partition log are then read in order.
When the consumer advances, it commits the offsets of messages that have been properly digested.

When a partition gets reassigned to another consumer in the group, the initial position is set to the last committed offset. If the consumer in the above case unexpectedly died, the group member who took over the partition would continue consuming from offset 1.
The offset of the final message recorded to the log is the log end offset. The high watermark is the offset of the most recently replicated message to all of the log's duplicates.
The essential thing for the customer to realize is that you can only read up to the high watermark. This protects the consumer from viewing unreplicated data that may be lost later.
Why do You Need Kafka Java Consumer?
- Kafka consumers can be used to optimize data consumption. The main purpose of Kafka's Java consumer is to read records from the appropriate Kafka broker using consumer and connection properties.
- The Java Consumer API easily handles the complexities of lag handling, concurrent application consumption, delivery semantics, and more.
Step 1: Installing Kafka
First, you need to download and install the Kafka environment. Make sure Java 8+ is pre-installed and running on your local computer. Configure the Java file path so that your operating system can find or point to the Java utilities.
The Kafka package is available on the official website. With the Kafka files installed and configured, you can now start the Kafka server and the Zookeeper instance.
Step 2: Set up the Kafka Environment
-
Begin by launching the Zookeeper instance. Zookeeper acts as a centralized coordination system in Kafka, maintaining metadata, Kafka servers, producer and consumer details, etc.
-
To start the Zookeeper instance, open a new command prompt and run the following command.
-
To start the Kafka server, create a new command line and run the following command.
-
The Kafka server and Zookeeper instance will successfully start after running the following instructions.
Step 3: Creating a Kafka Topic
-
You must create a topic that stores and organizes messages obtained from producers after installing the Kafka client library.
-
Also, users have the option of retrieving data or messages by subscribing to the relevant topic.
-
The Kafka topic CLI, which by default is included with the Kafka package, may be used to create topics. It is accessible through the "sn-kafka" docker container.
-
You will be directed to the appropriate Docker container to use the Kafka CLI tools for generating Kafka topics after running the aforementioned command.
-
To create a new Kafka topic, use the command below.
-
After you ran the aforementioned command, a new Kafka topic with the name java topic and a single partition and replication factor was formed.
-
You can send and receive messages across the Kafka servers using this topic.
Creating a Kafka Producer
- The Kafka producer can generate or deliver information to the topics inside different separations.
- The user does not need to specify the broker or the partition because the producer may quickly choose which data should be conveyed to which broker and which separation.
- Instead, it can convey the data to the cluster using the message keys and acknowledgment action plan.
- The key can be allowed by sending the data to any separating or by sending the data to a specified segregation, and the messages can approve the concept of the key for sending the messages in a specific order.

A producer must provide a degree of acknowledgment for a message to be successfully written into a Kafka topic (acks).
Message Keys
- Each event message can have a key and a value.
- Messages are dispersed uniformly across partitions in a topic if the key (key=null) is not given by the producer. This indicates that messages are distributed in a round-robin basis (partition p0 then p1 then p2, etc... then back to p0, and so on...).
- If a key is sent (key!= null), all messages with the same key are always transmitted and stored in the same Kafka partition. To identify a message, a key can be anything - a text, numeric value, binary value, etc.
- When there is a need for message ordering for all messages that share the same field, Kafka message keys are typically employed.
- In the case of monitoring vehicles in a fleet, for example, we want data from trucks to be in order at the individual truck level. In such a scenario, we may specify the truck id as the key. The data from the vehicle with the id truck id 123 will always go to partition p0 in the example below.

Steps
-
By default, the Kafka distribution offers a command-line interface for producing messages into Kafka topics. This command-line interface takes the form of a script file (.sh).
-
The CLI tool found inside the Docker container can also be used to build a producer console.
-
Run the below command to create a producer console
-
Running the above command will create Kafka producer successfully.
Implement a Kafka Consumer in Java
The processes taken to generate a consumer are as follows:
- Create Logger
- Develop consumer properties.
- Create a consumer.
- Consumers can sign up for a particular topic.
- Poll for some new data
Logger Implementation
The logger is set up to record messages while the application is running. The user must import the "org.slf4j class" to build a Logger object.
The code above depicts to import Logger in implementing a Kafka Consumer in Java
Creating Consumer Properties
When building a consumer, Apache Kafka provides a variety of distinct attributes.
The consumer configuration is shown below:
Key deserializer that implements org.apache.kafka.common.serialization interface for deserialization.
- Type: class
- Default:
- Valid Values:
- Importance: high
Value deserializer class that implements org.apache.kafka.common.serialization interface for Deserializer.
- Type: class
- Default:
- Valid Values:
- Importance: high
A set of host/port pairs to use when connecting to the Kafka cluster for the first time. Regardless of which servers are indicated here for bootstrapping, the client will utilize all servers—this list simply affects the initial hosts used to find the whole set of servers.
- Type: list
- Default: "
- Strings with non-null values are acceptable.
- Importance: high
A unique string identifying the consumer group to which this consumer belongs. Whether the consumer employs the group management feature through subscribe(topic) or the Kafka-based offset management technique, this attribute is necessary.
- Type: string
- Default: null
- Valid values are:
- Importance: high
What to do if there is no initial offset in Kafka or if the current offset no longer exists on the server (for example, because the data has been deleted):
- earliest: resets the offset to the earliest offset.
- latest: resets the offset to the most recent offset.
- none: throw an exception to the consumer if no previous offset for the consumer's group is discovered.
- anything else : throw an exception to the customer.
- Type: string
- Default: latest
- Acceptable values include [latest, earliest, and none].
- Importance: medium
Auto Offset Reset
Because Auto Offset Reset is set to "earliest", we will read all historical data in our topic on the first run of the application.
KafkaConsumer for Creating the Consumer
Build a KafkaConsumer object for the consumer, as seen below:
Subscribing the Consumer
A consumer can be subscribed to using a variety of subscribe APIs. We used Arrays.asList() here since the user may choose to subscribe to one or more topics. As a result, Arrays.asList() enables the user to subscribe to many topics.
To view the messages, the user must supply the topic name directly or via a string variable. A comma can be used to divide multiple topics.
Polling for New Data
The polling mechanism is used by the consumer to read data from Kafka. The poll method returns data that has yet to be collected by the consumer who has subscribed to the partitions.
For example, the length of the poll call. poll(Duration.ofMillis(100)) specifies the length of time to wait before producing an empty list if no data is provided (also called long polling)
Reading Data in Consumer Group
- The user may have several consumers reading data at the same time.
- This may be accomplished through a consumer group. One or more consumers in the consumer group will be able to read data from Kafka.
- If the user wishes to view the messages in chronological order, either reset or update the group id. This will restart the user's application and show the messages from the beginning.
Conclusion
-
A consumer is someone who reads or consumes data from the Kafka cluster via a topic. A consumer also understands which broker the data should be read from.
-
The consumer reads the data within each partition sequentially. This means that the consumer should not read data from offset 1 before reading data from offset 0. A consumer can also readily read data from numerous brokers at once.
-
This article also covered the implementation of a Kafka consumer in Java and The processes taken to generate a consumer are as follows:
- Create Logger
- Develop consumer properties.
- Create a consumer.
- Consumers can sign up for a particular topic.
- Poll for some new data