Mark As Completed Discussion

Producing and Consuming Messages

In Kafka, producing and consuming messages is at the heart of building real-time streaming applications. Let's explore how to produce and consume messages using Kafka.

Producing Messages

To produce messages, you need to write a Kafka producer application that sends messages to a Kafka topic. Here's an example of a Java code snippet that demonstrates how to produce messages using the Kafka API:

TEXT/X-JAVA
1import org.apache.kafka.clients.producer.Producer;
2import org.apache.kafka.clients.producer.ProducerRecord;
3
4public class KafkaProducerExample {
5    public static void main(String[] args) {
6        String topic = "my-topic";
7        String message = "Hello, Kafka!";
8
9        Producer<String, String> producer = new KafkaProducer<>(getProducerConfig());
10        producer.send(new ProducerRecord<>(topic, message));
11        producer.close();
12    }
13
14    private static Properties getProducerConfig() {
15        // Set Kafka producer properties
16        Properties props = new Properties();
17        props.put("bootstrap.servers", "localhost:9092");
18        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
19        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
20        // Add additional properties if required
21
22        return props;
23    }
24}

In this code snippet, we create a Kafka producer, set the necessary configuration properties, and send a message to the "my-topic" Kafka topic.

Consuming Messages

To consume messages, you need to write a Kafka consumer application that reads messages from Kafka topics. Here's an example of a Java code snippet that demonstrates how to consume messages using the Kafka API:

TEXT/X-JAVA
1import org.apache.kafka.clients.consumer.Consumer;
2import org.apache.kafka.clients.consumer.ConsumerRecords;
3import org.apache.kafka.clients.consumer.KafkaConsumer;
4
5import java.util.Collections;
6import java.util.Properties;
7
8public class KafkaConsumerExample {
9    public static void main(String[] args) {
10        String topic = "my-topic";
11
12        Consumer<String, String> consumer = new KafkaConsumer<>(getConsumerConfig());
13        consumer.subscribe(Collections.singletonList(topic));
14
15        while (true) {
16            ConsumerRecords<String, String> records = consumer.poll(100);
17
18            // Process the received messages
19            for (ConsumerRecord<String, String> record : records) {
20                System.out.println("Received message: " + record.value());
21            }
22        }
23    }
24
25    private static Properties getConsumerConfig() {
26        // Set Kafka consumer properties
27        Properties props = new Properties();
28        props.put("bootstrap.servers", "localhost:9092");
29        props.put("group.id", "my-consumer-group");
30        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
31        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
32        // Add additional properties if required
33
34        return props;
35    }
36}

In this code snippet, we create a Kafka consumer, set the necessary configuration properties (including the consumer group ID), and subscribe to the "my-topic" Kafka topic. The consumer continuously polls for new messages and processes them as they arrive.

By understanding how to produce and consume messages using Kafka, you are now ready to start building real-time streaming applications with Kafka and leverage its power for handling high volumes of data.

JAVA
OUTPUT
:001 > Cmd/Ctrl-Enter to run, Cmd/Ctrl-/ to comment