Mark As Completed Discussion

Build your intuition. Fill in the missing part by typing it in.

To receive messages from a Kafka topic using Java code, you need to use the Kafka __ API. Here's an example of how to consume messages:

TEXT/X-JAVA
1import org.apache.kafka.clients.consumer.Consumer;
2import org.apache.kafka.clients.consumer.ConsumerRecords;
3import org.apache.kafka.clients.consumer.KafkaConsumer;
4import org.apache.kafka.common.serialization.StringDeserializer;
5
6import java.time.Duration;
7import java.util.Collections;
8import java.util.Properties;
9
10public class KafkaMessageConsumer {
11    public static void main(String[] args) {
12        // Replace with your Java logic here
13        Consumer<String, String> consumer = createConsumer();
14        String topic = "my-topic";
15
16        // Subscribe to the topic
17        consumer.subscribe(Collections.singleton(topic));
18
19        // Start consuming messages
20        while (true) {
21            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
22
23            for (ConsumerRecord<String, String> record : records) {
24                String key = record.key();
25                String value = record.value();
26
27                // Process the message
28                System.out.println("Received message - Key: " + key + ", Value: " + value);
29            }
30        }
31    }
32
33    private static Consumer<String, String> createConsumer() {
34        // Set up consumer configuration
35        Properties properties = new Properties();
36        properties.put("bootstrap.servers", "localhost:9092");
37        properties.put("key.deserializer", StringDeserializer.class.getName());
38        properties.put("value.deserializer", StringDeserializer.class.getName());
39        properties.put("group.id", "my-consumer-group");
40
41        // Create a new Kafka consumer
42        return new KafkaConsumer<>(properties);
43    }
44}

Write the missing line below.