Spring Kafka - How to reset offset to latest with a group id?
Because I didn't saw any example of this, I'm gonna explain how I did here.
The class of your @KafkaListener
must implement a ConsumerSeekAware
class, which will permit to the listener to control the offset seeking when partitions are attributed. (source : https://docs.spring.io/spring-kafka/reference/htmlsingle/#seek )
public class KafkaMessageListener implements ConsumerSeekAware {
@KafkaListener(topics = "your.topic")
public void listen(byte[] payload) {
// ...
}
@Override
public void registerSeekCallback(ConsumerSeekCallback callback) {
}
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
assignments.forEach((t, o) -> callback.seekToEnd(t.topic(), t.partition()));
}
@Override
public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
}
}
Here, on a rebalance, we use the given callback to seek the last offset for all the given topics. Thanks to Artem Bilan ( https://stackoverflow.com/users/2756547/artem-bilan ) for guiding me to the answer.
Is there a way to reset offsets of a Kafka consumer group through an API?
Spring for Apache Kafka provides some convenience mechanisms for performing seeks, either during application initialization, or at any time thereafter.
The simplest is to have your listener extend AbstractConsumerSeekAware
or implement ConsumerSeekAware
.
Spring kafka consumer doesn't respect auto-offset-reset = latest
You must have committed the initial offset somehow, perhaps before you finalized this configuration.
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
That means you are responsible for committing the offsets.
Use AckMode.BATCH
(the default) or AckMode.RECORD
.
Or, delete the currently committed offset(s) with the kafka-consumer-groups
CLI tool (you can use the same tool to list the current offsets too).
Or use a UUID for the group to get a new group each time.
EDIT
You can also have your listener class implement ConsumerSeekAware
and call callback.seekToEnd(partitions)
in onPartitionsAssigned()
.
Reset topic offset by creating a new consumer group at every application restart
Yes, it would work with SCSt too but, as you say, it's a bit tricky to set a random group id, although you could set it as a System.property
before launching the SpringApplication
.
If you were using spring-kafka directly, it's easy, just implement ConsumerSeekAware
and you can seekToBeginning
when the partitions are assigned.
However, with SCSt, you don't have direct access to the listener.
One workaround would be to manually do the seek before you launch the SpringApplication
by creating a consumer with the same group id. It gets a bit tricky, though if you have multiple instances of your app because your might get different partitions each time.
We'll look again at fixing that issue (I just made a comment on it).
How do I reset Spring Kafka Offset with a rest end point
It is not clear what your problem is, simply do this...
public void seekToOffset(TopicPartition tp, long offset) {
getSeekCallbackFor(topicPartition).seek(tp.topic(), tp.partition(), offset);
}
Kafka Spring Consuming Messages which are not Consumed
auto.offset.rest property will resolve your issue. But you should know how it works in the runtime.
Use Case 1:
A consumer starts and has auto.offset.reset=latest, and the topic partition currently has data for offsets going from some range to other. The consumer group has committed the some offsetfor the topic before. Where will the consumer read from?
Ans : The offsets are already committed for this consumer group and topic partition, so the property auto.offset.reset is ignored
Use Case 2:
A consumer starts and has auto.offset.reset=none, and the topic partition currently has data for offsets going from some range to other. The consumer group has committed some offsets for the topic before. Where will the consumer read from?
Ans: auto.offset.reset=none means that the consumer will crash if the offsets it's recovering from have been deleted from Kafka.
Use Case 3:
A consumer has auto.offset.reset=latest, and the topic partition currently has data for offsets going from some range to other. The consumer group never committed offsets for the topic before. Where will the consumer read from?
Ans: Latest means that data retrievals will start from where the offsets currently end.
Related Topics
How to Return a Custom Object from a Spring Data JPA Group by Query
Replacing Double Backslashes With Single Backslash
Cannot Find Element Using Selenium Webdriver
Handling the Null Value from a Resultset
Ora-00942 Sqlexception With Hibernate (Unable to Find a Table)
Subscript and Superscript a String in Android
How to Count Method Calls by Instance
How to Find Max Date in List<Object>
Object Cannot Be Converted to Integer Error
How to Generate a Unique and Short File Name in Java
Regex, Remove Whitespace and All Other Characters
Several Ports (8005, 8080, 8009) Required by Tomcat Server At Localhost Are Already in Use
Optimizing Multiple If-Else Condition in Java
How to Use a Regex to Search Backwards Effectively
Multiple Queries Executed in Java in Single Statement
Validate an Enum With Springframework Validation Errors