Mark As Completed Discussion

Kafka Consumer Groups

In Kafka, consumer groups are used to achieve high throughput and fault tolerance when consuming messages from a topic. A consumer group is a group of Kafka consumer instances that work together to consume messages from one or more topics. Each partition within a topic can be consumed by only one consumer within a consumer group.

Consumer groups provide scalability and parallelism by distributing the message consumption workload across multiple consumers. Each consumer within the group is responsible for consuming messages from a subset of partitions, ensuring that the messages are evenly distributed.

To use consumer groups in Kafka, you need to define a group id when creating a consumer. The group id allows multiple consumers to coordinate and distribute the partitions they consume. Each consumer within the group will be assigned one or more partitions to consume from.

Here's an example of how to create a Kafka consumer with a group id and subscribe to a topic:

TEXT/X-JAVA
1import org.apache.kafka.clients.consumer.Consumer;
2import org.apache.kafka.clients.consumer.ConsumerRecords;
3import org.apache.kafka.clients.consumer.KafkaConsumer;
4import org.apache.kafka.common.serialization.StringDeserializer;
5
6import java.time.Duration;
7import java.util.Collections;
8import java.util.Properties;
9
10public class KafkaConsumerExample {
11  public static void main(String[] args) {
12    // Create a new consumer with group id
13    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(getConsumerProperties("my-consumer-group"));
14
15    // Subscribe to a topic
16    consumer.subscribe(Collections.singleton("my-topic"));
17
18    // Start consuming messages
19    while (true) {
20      ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
21
22      for (ConsumerRecord<String, String> record : records) {
23        // Process the message
24        System.out.println("Received message: " + record.value());
25      }
26    }
27  }
28
29  private static Properties getConsumerProperties(String groupId) {
30    Properties props = new Properties();
31    props.put("bootstrap.servers", "localhost:9092");
32    props.put("group.id", groupId);
33    props.put("key.deserializer", StringDeserializer.class.getName());
34    props.put("value.deserializer", StringDeserializer.class.getName());
35    return props;
36  }
37}
JAVA
OUTPUT
:001 > Cmd/Ctrl-Enter to run, Cmd/Ctrl-/ to comment