How to create Custom serializer in kafka?
Here you have an example to use your own serializer/deserializer for the Kafka message value. For Kafka message key is the same thing.
We want to send a serialized version of MyMessage as Kafka value and deserialize it again into a MyMessage object at consumer side.
Serializing MyMessage in producer side.
You should create a serializer class that implements org.apache.kafka.common.serialization.Serializer
serialize() method do the work, receiving your object and returning a serialized version as bytes array.
public class MyValueSerializer implements Serializer<MyMessage>
{
private boolean isKey;
@Override
public void configure(Map<String, ?> configs, boolean isKey)
{
this.isKey = isKey;
}
@Override
public byte[] serialize(String topic, MyMessage message)
{
if (message == null) {
return null;
}
try {
(serialize your MyMessage object into bytes)
return bytes;
} catch (IOException | RuntimeException e) {
throw new SerializationException("Error serializing value", e);
}
}
@Override
public void close()
{
}
}
final IntegerSerializer keySerializer = new IntegerSerializer();
final MyValueSerializer myValueSerializer = new MyValueSerializer();
final KafkaProducer<Integer, MyMessage> producer = new KafkaProducer<>(props, keySerializer, myValueSerializer);
int messageNo = 1;
int kafkaKey = messageNo;
MyMessage kafkaValue = new MyMessage();
ProducerRecord producerRecord = new ProducerRecord<>(topic, kafkaKey, kafkaValue);
producer.send(producerRecord, new DemoCallBack(logTag, startTime, messageNo, strValue));
Deserializing MyMessage in consumer side.
You should create a deserializer class that implements org.apache.kafka.common.serialization.Deserializer
deserialize() method do the work, receiving serialized value as bytes array and returning your object.
public class MyValueDeserializer implements Deserializer<MyMessage>
{
private boolean isKey;
@Override
public void configure(Map<String, ?> configs, boolean isKey)
{
this.isKey = isKey;
}
@Override
public MyMessage deserialize(String s, byte[] value)
{
if (value == null) {
return null;
}
try {
(deserialize value into your MyMessage object)
MyMessage message = new MyMessage();
return message;
} catch (IOException | RuntimeException e) {
throw new SerializationException("Error deserializing value", e);
}
}
@Override
public void close()
{
}
}
Then use it like this:
final IntegerDeserializer keyDeserializer = new IntegerDeserializer();
final MyValueDeserializer myValueDeserializer = new MyValueDeserializer();
final KafkaConsumer<Integer, MyMessage> consumer = new KafkaConsumer<>(props, keyDeserializer, myValueDeserializer);
ConsumerRecords<Integer, MyMessage> records = consumer.poll(1000);
for (ConsumerRecord<Integer, MyMessage> record : records) {
int kafkaKey = record.key();
MyMessage kafkaValue = record.value();
...
}
How to create custom serializer for kafka-mirror-maker?
You still have to deserialize bytes, so I'm not sure I understand the purpose of overriding only the serializer
If you want to manipulate the message, look at the MessageHandler
interface, and then the --handler
argument. In there, you would need to wrap both a deserailizer and serializer
Example here of renaming the topic - https://github.com/gwenshap/kafka-examples/tree/master/MirrorMakerHandler
Custom serialization in Kafka using CSharp
I just find the answer to this question.
The library to use for kafka in dotnet is provided by confluent.
Kafka .NET Client
There should be a serialization class implementing the interface :
Confluent.Kafka.ISerializer<T>
Normally we should create the producer via ProducerBuilder class :
Confluent.Kafka.ProducerBuilder<TKey, TValue>
There is a method to this class to set serializers for both key and value.
It is a little different than the original documentation for custom serializer in Kafka which instructing to set the name of the custom serializer class in the producer configuration.
Following is the code to set the value serializer:
var config = new ProducerConfig
{
BootstrapServers = "localhost:9092",
ClientId = "thiPC"
};
var producerBuilder = new ProducerBuilder<Null, Customer>(config);
producerBuilder.SetValueSerializer(new CustomerSerializer()); // <------ The Magic Code
var producer = producerBuilder.Build();
The sample code for the Serialization class is like this;
public class CustomerSerializer : Confluent.Kafka.ISerializer<Customer>
{
public byte[] Serialize(Customer data, SerializationContext context)
{ ..........}
}
There is nothing important for the Customer class in my example it is a normal class just to hold the customer properties.
In serialization class, you should return a byte[] in the Serialize method which is obvious!
I hope this would be useful for folks implementing Kafka with dotnet.
Create custom Serializer and Deserializer in kafka using scala
Find below the custom serializer and deserializer for case class User, User(name:String,id:Int). Replace User in code with your case class. It will work.
import java.io.{ObjectInputStream, ByteArrayInputStream}
import java.util
import org.apache.kafka.common.serialization.{Deserializer, Serializer}
class CustomDeserializer extends Deserializer[User]{
override def configure(configs: util.Map[String,_],isKey: Boolean):Unit = {
}
override def deserialize(topic:String,bytes: Array[Byte]) = {
val byteIn = new ByteArrayInputStream(bytes)
val objIn = new ObjectInputStream(byteIn)
val obj = objIn.readObject().asInstanceOf[User]
byteIn.close()
objIn.close()
obj
}
override def close():Unit = {
}
}
import java.io.{ObjectOutputStream, ByteArrayOutputStream}
import java.util
import org.apache.kafka.common.serialization.Serializer
class CustomSerializer extends Serializer[User]{
override def configure(configs: util.Map[String,_],isKey: Boolean):Unit = {
}
override def serialize(topic:String, data:User):Array[Byte] = {
try {
val byteOut = new ByteArrayOutputStream()
val objOut = new ObjectOutputStream(byteOut)
objOut.writeObject(data)
objOut.close()
byteOut.close()
byteOut.toByteArray
}
catch {
case ex:Exception => throw new Exception(ex.getMessage)
}
}
override def close():Unit = {
}
}
How to configure a custom Kafka deserializer and get the consumed JSON data using a KafkaListener
Since you are using Spring Boot, just set the value deserializer class name as a property and Boot will automatically wire it into the container factory for your @KafkaListener
. No need to define your own consumer factory or container factory.
spring.kafka.consumer.value-deserializer=com.acme.CustomDeserializer
https://docs.spring.io/spring-boot/docs/current/reference/html/application-properties.html#application-properties.integration.spring.kafka.consumer.value-deserializer
Related Topics
Eclipse Error: Could Not Find or Load Main Class
Spring Session-Scoped Beans (Controllers) and References to Services, in Terms of Serialization
Maximum Amount of Memory Per Java Process on Windows
Maven "Build Path Specifies Execution Environment J2Se-1.5", Even Though I Changed It to 1.7
How to Pass C Structs Back and Forth to Java Code in Jni
How to Add a Timeout Value When Using Java's Runtime.Exec()
How to Solve the "A Generic Array of T Is Created for a Varargs Parameter" Compiler Warning
How to Enable Http Response Caching in Spring Boot
Chat Client Emoticons Window Java
Converting a Date Object to a Calendar Object
Spring - Injecting a Dependency into a Servletcontextlistener
Java:Does Wait() Release Lock from Synchronized Block
Read Error Response Body in Java
How to Check a Uploaded File Whether It Is an Image or Other File
How to Improve the Performance of G.Drawimage() Method for Resizing Images
How to Parse Number String Containing Commas into an Integer in Java