Producing and Consuming Messages
In Kafka, producing and consuming messages is at the heart of building real-time streaming applications. Let's explore how to produce and consume messages using Kafka.
Producing Messages
To produce messages, you need to write a Kafka producer application that sends messages to a Kafka topic. Here's an example of a Java code snippet that demonstrates how to produce messages using the Kafka API:
1import org.apache.kafka.clients.producer.Producer;
2import org.apache.kafka.clients.producer.ProducerRecord;
3
4public class KafkaProducerExample {
5 public static void main(String[] args) {
6 String topic = "my-topic";
7 String message = "Hello, Kafka!";
8
9 Producer<String, String> producer = new KafkaProducer<>(getProducerConfig());
10 producer.send(new ProducerRecord<>(topic, message));
11 producer.close();
12 }
13
14 private static Properties getProducerConfig() {
15 // Set Kafka producer properties
16 Properties props = new Properties();
17 props.put("bootstrap.servers", "localhost:9092");
18 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
19 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
20 // Add additional properties if required
21
22 return props;
23 }
24}
In this code snippet, we create a Kafka producer, set the necessary configuration properties, and send a message to the "my-topic" Kafka topic.
Consuming Messages
To consume messages, you need to write a Kafka consumer application that reads messages from Kafka topics. Here's an example of a Java code snippet that demonstrates how to consume messages using the Kafka API:
1import org.apache.kafka.clients.consumer.Consumer;
2import org.apache.kafka.clients.consumer.ConsumerRecords;
3import org.apache.kafka.clients.consumer.KafkaConsumer;
4
5import java.util.Collections;
6import java.util.Properties;
7
8public class KafkaConsumerExample {
9 public static void main(String[] args) {
10 String topic = "my-topic";
11
12 Consumer<String, String> consumer = new KafkaConsumer<>(getConsumerConfig());
13 consumer.subscribe(Collections.singletonList(topic));
14
15 while (true) {
16 ConsumerRecords<String, String> records = consumer.poll(100);
17
18 // Process the received messages
19 for (ConsumerRecord<String, String> record : records) {
20 System.out.println("Received message: " + record.value());
21 }
22 }
23 }
24
25 private static Properties getConsumerConfig() {
26 // Set Kafka consumer properties
27 Properties props = new Properties();
28 props.put("bootstrap.servers", "localhost:9092");
29 props.put("group.id", "my-consumer-group");
30 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
31 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
32 // Add additional properties if required
33
34 return props;
35 }
36}
In this code snippet, we create a Kafka consumer, set the necessary configuration properties (including the consumer group ID), and subscribe to the "my-topic" Kafka topic. The consumer continuously polls for new messages and processes them as they arrive.
By understanding how to produce and consume messages using Kafka, you are now ready to start building real-time streaming applications with Kafka and leverage its power for handling high volumes of data.
xxxxxxxxxx
class Main {
public static void main(String[] args) {
// replace with your Java logic here
for(int i = 1; i <= 100; i++) {
if(i % 3 == 0 && i % 5 == 0) {
System.out.println("FizzBuzz");
} else if(i % 3 == 0) {
System.out.println("Fizz");
} else if(i % 5 == 0) {
System.out.println("Buzz");
} else {
System.out.println(i);
}
}
}
}