Kafka Message Serialization
In Kafka, messages are sent in serialized form and deserialized on the consumer side. Serialization is the process of converting objects or data into a format that can be stored or transmitted. Kafka provides built-in serializers and allows you to customize serialization for your messages.
Built-in Serializers
Kafka provides default serializers for common data types, such as strings and integers. When you use the default serializers, you don't need to implement any serialization logic. Here's an example of how to send a message using the default string serializer:
1import org.apache.kafka.clients.producer.Producer;
2import org.apache.kafka.clients.producer.ProducerRecord;
3import org.apache.kafka.clients.producer.KafkaProducer;
4import org.apache.kafka.common.serialization.StringSerializer;
5
6import java.util.Properties;
7
8public class KafkaProducerExample {
9 public static void main(String[] args) {
10 // Create a new producer
11 KafkaProducer<String, String> producer = new KafkaProducer<>(getProducerProperties());
12
13 // Create a message record
14 ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "Hello, Kafka!");
15
16 // Send the message
17 producer.send(record);
18
19 // Flush and close the producer
20 producer.flush();
21 producer.close();
22 }
23
24 private static Properties getProducerProperties() {
25 Properties props = new Properties();
26 props.put("bootstrap.servers", "localhost:9092");
27 props.put("key.serializer", StringSerializer.class.getName());
28 props.put("value.serializer", StringSerializer.class.getName());
29 return props;
30 }
31}
Custom Serializers
If you have complex data types or want to customize the serialization format, you can implement your own serializers. Kafka provides an interface, org.apache.kafka.common.serialization.Serializer
, that you can implement to define your custom serialization logic. Here's an example of a custom serializer for a Person
object:
1import org.apache.kafka.common.serialization.Serializer;
2
3import java.io.ByteArrayOutputStream;
4import java.io.IOException;
5import java.io.ObjectOutputStream;
6import java.util.Map;
7
8public class PersonSerializer implements Serializer<Person> {
9 @Override
10 public void configure(Map<String, ?> configs, boolean isKey) {
11 // Configuration logic
12 }
13
14 @Override
15 public byte[] serialize(String topic, Person data) {
16 try (ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
17 ObjectOutputStream objectStream = new ObjectOutputStream(byteStream)) {
18 objectStream.writeObject(data);
19 return byteStream.toByteArray();
20 } catch (IOException e) {
21 // Error handling
22 return null;
23 }
24 }
25
26 @Override
27 public void close() {
28 // Cleanup logic
29 }
30}
To use a custom serializer, you need to specify it in the producer or consumer configuration. Here's an example of how to configure a producer with a custom serializer:
1import org.apache.kafka.clients.producer.Producer;
2import org.apache.kafka.clients.producer.ProducerRecord;
3import org.apache.kafka.clients.producer.KafkaProducer;
4
5import java.util.Properties;
6
7public class KafkaProducerExample {
8 public static void main(String[] args) {
9 // Create a new producer with custom serializer
10 KafkaProducer<String, Person> producer = new KafkaProducer<>(getProducerProperties());
11
12 // Create a person object
13 Person person = new Person("John Doe", 30);
14
15 // Create a message record
16 ProducerRecord<String, Person> record = new ProducerRecord<>("my-topic", person);
17
18 // Send the message
19 producer.send(record);
20
21 // Flush and close the producer
22 producer.flush();
23 producer.close();
24 }
25
26 private static Properties getProducerProperties() {
27 Properties props = new Properties();
28 props.put("bootstrap.servers", "localhost:9092");
29 props.put("key.serializer", StringSerializer.class.getName());
30 props.put("value.serializer", PersonSerializer.class.getName());
31 return props;
32 }
33}
xxxxxxxxxx
}
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class KafkaProducerExample {
public static void main(String[] args) {
// Create a new producer
KafkaProducer<String, String> producer = new KafkaProducer<>(getProducerProperties());
// Create a message record
ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "Hello, Kafka!");
// Send the message
producer.send(record);
// Flush and close the producer
producer.flush();
producer.close();
}
private static Properties getProducerProperties() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
return props;