Mark As Completed Discussion

Kafka Consumers

Kafka consumers are responsible for reading messages from Kafka topics. They allow applications to subscribe to specific topics and consume data from them. Let's explore how to work with Kafka consumers and their configurations.

Creating a Kafka Consumer

To create a Kafka consumer in Java, you can use the KafkaConsumer class from the Kafka client library. The following code snippet demonstrates how to create a Kafka consumer and consume messages from a topic:

TEXT/X-JAVA
1// Import the required classes
2import org.apache.kafka.clients.consumer.KafkaConsumer;
3import org.apache.kafka.clients.consumer.ConsumerRecords;
4import org.apache.kafka.clients.consumer.ConsumerRecord;
5
6import java.util.Properties;
7import java.util.Arrays;
8
9public class KafkaConsumerExample {
10  public static void main(String[] args) {
11    // Create properties for the Kafka consumer
12    Properties properties = new Properties();
13    properties.put("bootstrap.servers", "localhost:9092");
14    properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
15    properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
16
17    // Create a KafkaConsumer instance
18    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
19
20    // Subscribe to the topic
21    consumer.subscribe(Arrays.asList("my-topic"));
22
23    // Start consuming messages
24    while (true) {
25      ConsumerRecords<String, String> records = consumer.poll(100);
26      for (ConsumerRecord<String, String> record : records) {
27        System.out.println("Received message: " + record.value());
28      }
29    }
30  }
31}

This code creates a Kafka consumer that subscribes to a topic named my-topic and continuously polls for new messages. The messages are deserialized using the StringDeserializer.

Kafka Consumer Configurations

Kafka consumers can be configured with various properties to customize their behavior. Some commonly used configurations include:

  • bootstrap.servers: A list of host and port pairs that the consumer will use to establish an initial connection to the Kafka cluster.
  • key.deserializer: The class used to deserialize the key object from bytes.
  • value.deserializer: The class used to deserialize the value object from bytes.

You can set these configurations by creating a Properties object and passing it to the KafkaConsumer constructor, as shown in the previous code snippet.

Consuming Messages

To consume messages with a Kafka consumer, you need to subscribe to one or more topics using the subscribe() method. The consumer will then continuously poll for new messages using the poll() method. The received messages can be processed within the for loop.

TEXT/X-JAVA
1ConsumerRecords<String, String> records = consumer.poll(100);
2for (ConsumerRecord<String, String> record : records) {
3  // Process the record
4}

Make sure to handle any exceptions that may occur during the consuming process. In a production environment, you might need to modify the code based on your specific requirements.

JAVA
OUTPUT
:001 > Cmd/Ctrl-Enter to run, Cmd/Ctrl-/ to comment