Kafka Consumers
Kafka consumers are responsible for reading messages from Kafka topics. They allow applications to subscribe to specific topics and consume data from them. Let's explore how to work with Kafka consumers and their configurations.
Creating a Kafka Consumer
To create a Kafka consumer in Java, you can use the KafkaConsumer
class from the Kafka client library. The following code snippet demonstrates how to create a Kafka consumer and consume messages from a topic:
1// Import the required classes
2import org.apache.kafka.clients.consumer.KafkaConsumer;
3import org.apache.kafka.clients.consumer.ConsumerRecords;
4import org.apache.kafka.clients.consumer.ConsumerRecord;
5
6import java.util.Properties;
7import java.util.Arrays;
8
9public class KafkaConsumerExample {
10 public static void main(String[] args) {
11 // Create properties for the Kafka consumer
12 Properties properties = new Properties();
13 properties.put("bootstrap.servers", "localhost:9092");
14 properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
15 properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
16
17 // Create a KafkaConsumer instance
18 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
19
20 // Subscribe to the topic
21 consumer.subscribe(Arrays.asList("my-topic"));
22
23 // Start consuming messages
24 while (true) {
25 ConsumerRecords<String, String> records = consumer.poll(100);
26 for (ConsumerRecord<String, String> record : records) {
27 System.out.println("Received message: " + record.value());
28 }
29 }
30 }
31}
This code creates a Kafka consumer that subscribes to a topic named my-topic
and continuously polls for new messages. The messages are deserialized using the StringDeserializer
.
Kafka Consumer Configurations
Kafka consumers can be configured with various properties to customize their behavior. Some commonly used configurations include:
bootstrap.servers
: A list of host and port pairs that the consumer will use to establish an initial connection to the Kafka cluster.key.deserializer
: The class used to deserialize the key object from bytes.value.deserializer
: The class used to deserialize the value object from bytes.
You can set these configurations by creating a Properties
object and passing it to the KafkaConsumer
constructor, as shown in the previous code snippet.
Consuming Messages
To consume messages with a Kafka consumer, you need to subscribe to one or more topics using the subscribe()
method. The consumer will then continuously poll for new messages using the poll()
method. The received messages can be processed within the for
loop.
1ConsumerRecords<String, String> records = consumer.poll(100);
2for (ConsumerRecord<String, String> record : records) {
3 // Process the record
4}
Make sure to handle any exceptions that may occur during the consuming process. In a production environment, you might need to modify the code based on your specific requirements.
xxxxxxxxxx
class Main {
public static void main(String[] args) {
// replace with your Java logic here
for(int i = 1; i <= 100; i++) {
if(i % 3 == 0 && i % 5 == 0) {
System.out.println("FizzBuzz");
} else if(i % 3 == 0) {
System.out.println("Fizz");
} else if(i % 5 == 0) {
System.out.println("Buzz");
} else {
System.out.println(i);
}
}
}
}