Kafka: How to Delete Records from a Topic Using Java API

Kafka delete records from the topic without using offsets but by a field of the record

The main thing that I need to point out is that you shouldn't consider data in Kafka the same thing as data in a Database. Kafka has not been designed to work in such a way ( e.g: when I click the X button, the Y records will be deleted ).

Instead, you should see a topic as a stream of never-ending data. Every record that is produced to a Kafka topic will be consumed and processed independently by the consumer.

Perceiving the topic as a stream gives you a different solution:

You can use a second topic with the filtered results in it!

Streaming Diagram                            ___ Topic A ____--  Produced Messages -->  |                |      _______________________                           |________________| --> |                       |                                                  | Filtering Application |                            ___  Topic B ___      |                       |                           |                | <-- |_______________________|<-- Consumed Messages --   |________________|

Delete Messages from a Topic in Apache Kafka

If you are searching for a way to delete messages selectively, the new AdminClient API (usable from Java code) provides the following deleteRecords method :

https://kafka.apache.org/11/javadoc/org/apache/kafka/clients/admin/AdminClient.html

Delete single records from a kafka topic

On a compacted Topic you can mark a record as a 'tombstone' by publishing a message for the key you want to indicate is deleted with a null value. See answers here: Kafka not deleting key with tombstone

If the topic is not compacted, the record must be removed through retention policies. You cannot target individual offsets to remove.

Is there a way to delete all the data from a topic or delete the topic before every run?

Don't think it is supported yet. Take a look at this JIRA issue "Add delete topic support".

To delete manually:

  1. Shutdown the cluster
  2. Clean kafka log dir (specified by the log.dir attribute in kafka config file ) as well the zookeeper data
  3. Restart the cluster

For any given topic what you can do is

  1. Stop kafka
  2. Clean kafka log specific to partition, kafka stores its log file in a format of "logDir/topic-partition" so for a topic named "MyTopic" the log for partition id 0 will be stored in /tmp/kafka-logs/MyTopic-0 where /tmp/kafka-logs is specified by the log.dir attribute
  3. Restart kafka

This is NOT a good and recommended approach but it should work.
In the Kafka broker config file the log.retention.hours.per.topic attribute is used to define The number of hours to keep a log file before deleting it for some specific topic

Also, is there a way the messages gets deleted as soon as the consumer reads it?

From the Kafka Documentation :

The Kafka cluster retains all published messages—whether or not they have been consumed—for a configurable period of time. For example if the log retention is set to two days, then for the two days after a message is published it is available for consumption, after which it will be discarded to free up space. Kafka's performance is effectively constant with respect to data size so retaining lots of data is not a problem.

In fact the only metadata retained on a per-consumer basis is the position of the consumer in in the log, called the "offset". This offset is controlled by the consumer: normally a consumer will advance its offset linearly as it reads messages, but in fact the position is controlled by the consumer and it can consume messages in any order it likes. For example a consumer can reset to an older offset to reprocess.

For finding the start offset to read in Kafka 0.8 Simple Consumer example they say

Kafka includes two constants to help, kafka.api.OffsetRequest.EarliestTime() finds the beginning of the data in the logs and starts streaming from there, kafka.api.OffsetRequest.LatestTime() will only stream new messages.

You can also find the example code there for managing the offset at your consumer end.

    public static long getLastOffset(SimpleConsumer consumer, String topic, int partition,
long whichTime, String clientName) {
TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(),clientName);
OffsetResponse response = consumer.getOffsetsBefore(request);

if (response.hasError()) {
System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition) );
return 0;
}
long[] offsets = response.offsets(topic, partition);
return offsets[0];
}


Related Topics



Leave a reply



Submit