Writing Custom Kafka Serializer

How to create Custom serializer in kafka?

Here you have an example to use your own serializer/deserializer for the Kafka message value. For Kafka message key is the same thing.

We want to send a serialized version of MyMessage as Kafka value and deserialize it again into a MyMessage object at consumer side.

Serializing MyMessage in producer side.

You should create a serializer class that implements org.apache.kafka.common.serialization.Serializer

serialize() method do the work, receiving your object and returning a serialized version as bytes array.

public class MyValueSerializer implements Serializer<MyMessage>
{
private boolean isKey;

@Override
public void configure(Map<String, ?> configs, boolean isKey)
{
this.isKey = isKey;
}

@Override
public byte[] serialize(String topic, MyMessage message)
{
if (message == null) {
return null;
}

try {

(serialize your MyMessage object into bytes)

return bytes;

} catch (IOException | RuntimeException e) {
throw new SerializationException("Error serializing value", e);
}
}

@Override
public void close()
{

}
}

final IntegerSerializer keySerializer = new IntegerSerializer();
final MyValueSerializer myValueSerializer = new MyValueSerializer();
final KafkaProducer<Integer, MyMessage> producer = new KafkaProducer<>(props, keySerializer, myValueSerializer);

int messageNo = 1;
int kafkaKey = messageNo;
MyMessage kafkaValue = new MyMessage();
ProducerRecord producerRecord = new ProducerRecord<>(topic, kafkaKey, kafkaValue);
producer.send(producerRecord, new DemoCallBack(logTag, startTime, messageNo, strValue));

Deserializing MyMessage in consumer side.

You should create a deserializer class that implements org.apache.kafka.common.serialization.Deserializer

deserialize() method do the work, receiving serialized value as bytes array and returning your object.

public class MyValueDeserializer implements Deserializer<MyMessage>
{
private boolean isKey;

@Override
public void configure(Map<String, ?> configs, boolean isKey)
{
this.isKey = isKey;
}

@Override
public MyMessage deserialize(String s, byte[] value)
{
if (value == null) {
return null;
}

try {

(deserialize value into your MyMessage object)

MyMessage message = new MyMessage();
return message;

} catch (IOException | RuntimeException e) {
throw new SerializationException("Error deserializing value", e);
}
}

@Override
public void close()
{

}
}

Then use it like this:

final IntegerDeserializer keyDeserializer = new IntegerDeserializer();
final MyValueDeserializer myValueDeserializer = new MyValueDeserializer();
final KafkaConsumer<Integer, MyMessage> consumer = new KafkaConsumer<>(props, keyDeserializer, myValueDeserializer);

ConsumerRecords<Integer, MyMessage> records = consumer.poll(1000);
for (ConsumerRecord<Integer, MyMessage> record : records) {

int kafkaKey = record.key();
MyMessage kafkaValue = record.value();

...
}

How to create custom serializer for kafka-mirror-maker?

You still have to deserialize bytes, so I'm not sure I understand the purpose of overriding only the serializer

If you want to manipulate the message, look at the MessageHandler interface, and then the --handler argument. In there, you would need to wrap both a deserailizer and serializer

Example here of renaming the topic - https://github.com/gwenshap/kafka-examples/tree/master/MirrorMakerHandler

Custom serialization in Kafka using CSharp

I just find the answer to this question.
The library to use for kafka in dotnet is provided by confluent.

Kafka .NET Client

There should be a serialization class implementing the interface :

Confluent.Kafka.ISerializer<T>

Normally we should create the producer via ProducerBuilder class :

Confluent.Kafka.ProducerBuilder<TKey, TValue>

There is a method to this class to set serializers for both key and value.
It is a little different than the original documentation for custom serializer in Kafka which instructing to set the name of the custom serializer class in the producer configuration.

Following is the code to set the value serializer:

var config = new ProducerConfig
{
BootstrapServers = "localhost:9092",
ClientId = "thiPC"
};

var producerBuilder = new ProducerBuilder<Null, Customer>(config);
producerBuilder.SetValueSerializer(new CustomerSerializer()); // <------ The Magic Code
var producer = producerBuilder.Build();

The sample code for the Serialization class is like this;

public class CustomerSerializer : Confluent.Kafka.ISerializer<Customer>
{
public byte[] Serialize(Customer data, SerializationContext context)
{ ..........}
}

There is nothing important for the Customer class in my example it is a normal class just to hold the customer properties.

In serialization class, you should return a byte[] in the Serialize method which is obvious!

I hope this would be useful for folks implementing Kafka with dotnet.

Create custom Serializer and Deserializer in kafka using scala

Find below the custom serializer and deserializer for case class User, User(name:String,id:Int). Replace User in code with your case class. It will work.

import java.io.{ObjectInputStream, ByteArrayInputStream}
import java.util

import org.apache.kafka.common.serialization.{Deserializer, Serializer}

class CustomDeserializer extends Deserializer[User]{

override def configure(configs: util.Map[String,_],isKey: Boolean):Unit = {

}
override def deserialize(topic:String,bytes: Array[Byte]) = {
val byteIn = new ByteArrayInputStream(bytes)
val objIn = new ObjectInputStream(byteIn)
val obj = objIn.readObject().asInstanceOf[User]
byteIn.close()
objIn.close()
obj
}
override def close():Unit = {

}

}

import java.io.{ObjectOutputStream, ByteArrayOutputStream}
import java.util
import org.apache.kafka.common.serialization.Serializer

class CustomSerializer extends Serializer[User]{

override def configure(configs: util.Map[String,_],isKey: Boolean):Unit = {

}

override def serialize(topic:String, data:User):Array[Byte] = {
try {
val byteOut = new ByteArrayOutputStream()
val objOut = new ObjectOutputStream(byteOut)
objOut.writeObject(data)
objOut.close()
byteOut.close()
byteOut.toByteArray
}
catch {
case ex:Exception => throw new Exception(ex.getMessage)
}
}

override def close():Unit = {

}

}

How to configure a custom Kafka deserializer and get the consumed JSON data using a KafkaListener

Since you are using Spring Boot, just set the value deserializer class name as a property and Boot will automatically wire it into the container factory for your @KafkaListener. No need to define your own consumer factory or container factory.

spring.kafka.consumer.value-deserializer=com.acme.CustomDeserializer

https://docs.spring.io/spring-boot/docs/current/reference/html/application-properties.html#application-properties.integration.spring.kafka.consumer.value-deserializer



Related Topics



Leave a reply



Submit