Spring Kafka - How to Reset Offset to Latest With a Group Id

Spring Kafka - How to reset offset to latest with a group id?

Because I didn't saw any example of this, I'm gonna explain how I did here.

The class of your @KafkaListener must implement a ConsumerSeekAware class, which will permit to the listener to control the offset seeking when partitions are attributed. (source : https://docs.spring.io/spring-kafka/reference/htmlsingle/#seek )

public class KafkaMessageListener implements ConsumerSeekAware {
@KafkaListener(topics = "your.topic")
public void listen(byte[] payload) {
// ...
}

@Override
public void registerSeekCallback(ConsumerSeekCallback callback) {

}

@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
assignments.forEach((t, o) -> callback.seekToEnd(t.topic(), t.partition()));
}

@Override
public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {


}
}

Here, on a rebalance, we use the given callback to seek the last offset for all the given topics. Thanks to Artem Bilan ( https://stackoverflow.com/users/2756547/artem-bilan ) for guiding me to the answer.

Is there a way to reset offsets of a Kafka consumer group through an API?

Spring for Apache Kafka provides some convenience mechanisms for performing seeks, either during application initialization, or at any time thereafter.

The simplest is to have your listener extend AbstractConsumerSeekAware or implement ConsumerSeekAware.

Spring kafka consumer doesn't respect auto-offset-reset = latest

You must have committed the initial offset somehow, perhaps before you finalized this configuration.

factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);

That means you are responsible for committing the offsets.

Use AckMode.BATCH (the default) or AckMode.RECORD.

Or, delete the currently committed offset(s) with the kafka-consumer-groups CLI tool (you can use the same tool to list the current offsets too).

Or use a UUID for the group to get a new group each time.

EDIT

You can also have your listener class implement ConsumerSeekAware and call callback.seekToEnd(partitions) in onPartitionsAssigned().

Reset topic offset by creating a new consumer group at every application restart

Yes, it would work with SCSt too but, as you say, it's a bit tricky to set a random group id, although you could set it as a System.property before launching the SpringApplication.

If you were using spring-kafka directly, it's easy, just implement ConsumerSeekAware and you can seekToBeginning when the partitions are assigned.

However, with SCSt, you don't have direct access to the listener.

One workaround would be to manually do the seek before you launch the SpringApplication by creating a consumer with the same group id. It gets a bit tricky, though if you have multiple instances of your app because your might get different partitions each time.

We'll look again at fixing that issue (I just made a comment on it).

How do I reset Spring Kafka Offset with a rest end point

It is not clear what your problem is, simply do this...

public void seekToOffset(TopicPartition tp, long offset) {
getSeekCallbackFor(topicPartition).seek(tp.topic(), tp.partition(), offset);
}

Kafka Spring Consuming Messages which are not Consumed

auto.offset.rest property will resolve your issue. But you should know how it works in the runtime.

Use Case 1:

A consumer starts and has auto.offset.reset=latest, and the topic partition currently has data for offsets going from some range to other. The consumer group has committed the some offsetfor the topic before. Where will the consumer read from?

Ans : The offsets are already committed for this consumer group and topic partition, so the property auto.offset.reset is ignored

Use Case 2:

A consumer starts and has auto.offset.reset=none, and the topic partition currently has data for offsets going from some range to other. The consumer group has committed some offsets for the topic before. Where will the consumer read from?

Ans: auto.offset.reset=none means that the consumer will crash if the offsets it's recovering from have been deleted from Kafka.

Use Case 3:

A consumer has auto.offset.reset=latest, and the topic partition currently has data for offsets going from some range to other. The consumer group never committed offsets for the topic before. Where will the consumer read from?

Ans: Latest means that data retrievals will start from where the offsets currently end.



Related Topics



Leave a reply



Submit