Build your intuition. Fill in the missing part by typing it in.
To receive messages from a Kafka topic using Java code, you need to use the Kafka __ API. Here's an example of how to consume messages:
TEXT/X-JAVA
1import org.apache.kafka.clients.consumer.Consumer;
2import org.apache.kafka.clients.consumer.ConsumerRecords;
3import org.apache.kafka.clients.consumer.KafkaConsumer;
4import org.apache.kafka.common.serialization.StringDeserializer;
5
6import java.time.Duration;
7import java.util.Collections;
8import java.util.Properties;
9
10public class KafkaMessageConsumer {
11 public static void main(String[] args) {
12 // Replace with your Java logic here
13 Consumer<String, String> consumer = createConsumer();
14 String topic = "my-topic";
15
16 // Subscribe to the topic
17 consumer.subscribe(Collections.singleton(topic));
18
19 // Start consuming messages
20 while (true) {
21 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
22
23 for (ConsumerRecord<String, String> record : records) {
24 String key = record.key();
25 String value = record.value();
26
27 // Process the message
28 System.out.println("Received message - Key: " + key + ", Value: " + value);
29 }
30 }
31 }
32
33 private static Consumer<String, String> createConsumer() {
34 // Set up consumer configuration
35 Properties properties = new Properties();
36 properties.put("bootstrap.servers", "localhost:9092");
37 properties.put("key.deserializer", StringDeserializer.class.getName());
38 properties.put("value.deserializer", StringDeserializer.class.getName());
39 properties.put("group.id", "my-consumer-group");
40
41 // Create a new Kafka consumer
42 return new KafkaConsumer<>(properties);
43 }
44}
Write the missing line below.