Mark As Completed Discussion

Kafka Message Serialization

In Kafka, messages are sent in serialized form and deserialized on the consumer side. Serialization is the process of converting objects or data into a format that can be stored or transmitted. Kafka provides built-in serializers and allows you to customize serialization for your messages.

Built-in Serializers

Kafka provides default serializers for common data types, such as strings and integers. When you use the default serializers, you don't need to implement any serialization logic. Here's an example of how to send a message using the default string serializer:

TEXT/X-JAVA
1import org.apache.kafka.clients.producer.Producer;
2import org.apache.kafka.clients.producer.ProducerRecord;
3import org.apache.kafka.clients.producer.KafkaProducer;
4import org.apache.kafka.common.serialization.StringSerializer;
5
6import java.util.Properties;
7
8public class KafkaProducerExample {
9  public static void main(String[] args) {
10    // Create a new producer
11    KafkaProducer<String, String> producer = new KafkaProducer<>(getProducerProperties());
12
13    // Create a message record
14    ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "Hello, Kafka!");
15
16    // Send the message
17    producer.send(record);
18
19    // Flush and close the producer
20    producer.flush();
21    producer.close();
22  }
23
24  private static Properties getProducerProperties() {
25    Properties props = new Properties();
26    props.put("bootstrap.servers", "localhost:9092");
27    props.put("key.serializer", StringSerializer.class.getName());
28    props.put("value.serializer", StringSerializer.class.getName());
29    return props;
30  }
31}

Custom Serializers

If you have complex data types or want to customize the serialization format, you can implement your own serializers. Kafka provides an interface, org.apache.kafka.common.serialization.Serializer, that you can implement to define your custom serialization logic. Here's an example of a custom serializer for a Person object:

TEXT/X-JAVA
1import org.apache.kafka.common.serialization.Serializer;
2
3import java.io.ByteArrayOutputStream;
4import java.io.IOException;
5import java.io.ObjectOutputStream;
6import java.util.Map;
7
8public class PersonSerializer implements Serializer<Person> {
9  @Override
10  public void configure(Map<String, ?> configs, boolean isKey) {
11    // Configuration logic
12  }
13
14  @Override
15  public byte[] serialize(String topic, Person data) {
16    try (ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
17         ObjectOutputStream objectStream = new ObjectOutputStream(byteStream)) {
18      objectStream.writeObject(data);
19      return byteStream.toByteArray();
20    } catch (IOException e) {
21      // Error handling
22      return null;
23    }
24  }
25
26  @Override
27  public void close() {
28    // Cleanup logic
29  }
30}

To use a custom serializer, you need to specify it in the producer or consumer configuration. Here's an example of how to configure a producer with a custom serializer:

TEXT/X-JAVA
1import org.apache.kafka.clients.producer.Producer;
2import org.apache.kafka.clients.producer.ProducerRecord;
3import org.apache.kafka.clients.producer.KafkaProducer;
4
5import java.util.Properties;
6
7public class KafkaProducerExample {
8  public static void main(String[] args) {
9    // Create a new producer with custom serializer
10    KafkaProducer<String, Person> producer = new KafkaProducer<>(getProducerProperties());
11
12    // Create a person object
13    Person person = new Person("John Doe", 30);
14
15    // Create a message record
16    ProducerRecord<String, Person> record = new ProducerRecord<>("my-topic", person);
17
18    // Send the message
19    producer.send(record);
20
21    // Flush and close the producer
22    producer.flush();
23    producer.close();
24  }
25
26  private static Properties getProducerProperties() {
27    Properties props = new Properties();
28    props.put("bootstrap.servers", "localhost:9092");
29    props.put("key.serializer", StringSerializer.class.getName());
30    props.put("value.serializer", PersonSerializer.class.getName());
31    return props;
32  }
33}
JAVA
OUTPUT
:001 > Cmd/Ctrl-Enter to run, Cmd/Ctrl-/ to comment