Consuming Messages
To receive messages from a Kafka topic using Java code, you need to use the Kafka Consumer API. Here's an example of how to consume messages:
TEXT/X-JAVA
1import org.apache.kafka.clients.consumer.Consumer;
2import org.apache.kafka.clients.consumer.ConsumerRecords;
3import org.apache.kafka.clients.consumer.KafkaConsumer;
4import org.apache.kafka.common.serialization.StringDeserializer;
5
6import java.time.Duration;
7import java.util.Collections;
8import java.util.Properties;
9
10public class KafkaMessageConsumer {
11 public static void main(String[] args) {
12 // Replace with your Java logic here
13 Consumer<String, String> consumer = createConsumer();
14 String topic = "my-topic";
15
16 // Subscribe to the topic
17 consumer.subscribe(Collections.singleton(topic));
18
19 // Start consuming messages
20 while (true) {
21 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
22
23 for (ConsumerRecord<String, String> record : records) {
24 String key = record.key();
25 String value = record.value();
26
27 // Process the message
28 System.out.println("Received message - Key: " + key + ", Value: " + value);
29 }
30 }
31 }
32
33 private static Consumer<String, String> createConsumer() {
34 // Set up consumer configuration
35 Properties properties = new Properties();
36 properties.put("bootstrap.servers", "localhost:9092");
37 properties.put("key.deserializer", StringDeserializer.class.getName());
38 properties.put("value.deserializer", StringDeserializer.class.getName());
39 properties.put("group.id", "my-consumer-group");
40
41 // Create a new Kafka consumer
42 return new KafkaConsumer<>(properties);
43 }
44}
xxxxxxxxxx
35
}
class Main {
public static void main(String[] args) {
// Replace with your Java logic here
Consumer<String, String> consumer = createConsumer();
String topic = "my-topic";
// Subscribe to the topic
consumer.subscribe(Collections.singleton(topic));
// Start consuming messages
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
String key = record.key();
String value = record.value();
// Process the message
System.out.println("Received message - Key: " + key + ", Value: " + value);
}
}
}
private static Consumer<String, String> createConsumer() {
// Set up consumer configuration
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("key.deserializer", StringDeserializer.class.getName());
properties.put("value.deserializer", StringDeserializer.class.getName());
OUTPUT
:001 > Cmd/Ctrl-Enter to run, Cmd/Ctrl-/ to comment