What Determines Kafka Consumer Offset

What determines Kafka consumer offset?

It is a bit more complex than you described.

The auto.offset.reset config kicks in ONLY if your consumer group does not have a valid offset committed somewhere (2 supported offset storages now are Kafka and Zookeeper), and it also depends on what sort of consumer you use.

If you use a high-level java consumer then imagine following scenarios:

  1. You have a consumer in a consumer group group1 that has consumed 5 messages and died. Next time you start this consumer it won't even use that auto.offset.reset config and will continue from the place it died because it will just fetch the stored offset from the offset storage (Kafka or ZK as I mentioned).

  2. You have messages in a topic (like you described) and you start a consumer in a new consumer group group2. There is no offset stored anywhere and this time the auto.offset.reset config will decide whether to start from the beginning of the topic (earliest) or from the end of the topic (latest)

One more thing that affects what offset value will correspond to earliest and latest configs is log retention policy. Imagine you have a topic with retention configured to 1 hour. You produce 5 messages, and then an hour later you post 5 more messages. The latest offset will still remain the same as in previous example but the earliest one won't be able to be 0 because Kafka will already remove these messages and thus the earliest available offset will be 5.

Everything mentioned above is not related to SimpleConsumer and every time you run it, it will decide where to start from using the auto.offset.reset config.

If you use Kafka version older than 0.9, you have to replace earliest, latest with smallest,largest.

How to determine a Kafka consumer's offset

Take a look at the kafka-consumer-groups tool, which can be used to check offsets and lag of consumers (consumer has to be active at the time you run this command).

./kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --new-consumer --describe --group console-consumer-55936

GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG OWNER
console-consumer-55936 test 0 6 6 0 consumer-1_/192.168.0.83
console-consumer-55936 test 1 1 1 0 consumer-1_/192.168.0.83
console-consumer-55936 test 2 1 1 0 consumer-1_/192.168.0.83
console-consumer-55936 test 3 1 1 0 consumer-1_/192.168.0.83
console-consumer-55936 test 4 2 2 0 consumer-1_/192.168.0.83
console-consumer-55936 test 5 1 1 0 consumer-1_/192.168.0.83
console-consumer-55936 test 6 1 1 0 consumer-1_/192.168.0.83
console-consumer-55936 test 7 2 2 0 consumer-1_/192.168.0.83
console-consumer-55936 test 8 1 1 0 consumer-1_/192.168.0.83

This should allow you to track whether anything is actually being consumed or not.

What consumer offset will be set if auto.offset.reset=earliest but topic has no messages

Although no data is available in the Kafka topic, your brokers still know the "next" offset within that partition. In your case the first and last offset of this topic is 10 whereas it does not contain any data.

Therefore, your consumer which already has committed offset 10 will try to read 11 when started again, independent of the consumer configuration auto.offset.reset.

Your example will get even more interesting when your topic has had offsets, say, until 15 while the consumer was shut down after committing offset 10. Now, imagine all offsets were removed from the topic due to the retention policy. If you then start your consumer only then the consumer configuration auto.offset.reset comes into effect as stated in the documentation:

"What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted)"

As long as the Kafka topic is empty there is no offset "set" for the consumer. The consumer just tries to find the next available offset, either based on

  • the last committed offset or,
  • in case the last committed offset does not exist anymore, the configuration given through auto.offset.reset.

Just as an additional note: Even though the messages seem to get cleaned by the retention policy you may still see some data in the topic due to Data still remains in Kafka topic even after retention time/size

Is Consumer Offset managed at Consumer group level or at the individual consumer inside that consumer group?

Offsets are tracked at ConsumerGroup level.

Imagine you have 4 consumer threads in one ConsumerGroup consuming from one topic with 4 partitions. If you now stop all 4 threads and restart just a single one with the same group the one threads will know where all 4 threads left off consuming and continue from there.



"you are saying one offset (basically a shared int/long value) will be shared/updated by all the consumers in a consumer group?"

Yes, this is correct. Remember that a single partition of a topic can be read only by one consumer thread within a group. Two consumer threads of the same ConsumerGroup will never consume a single topic partition at the same time. The offsets of the consumers groups are stored in an internal Kafka topic called __consumer_offsets. In this topic you basically have a key/value pair, where your key is basically the concatenation of

  • ConsumerGroup
  • Topic
  • Partition within Topic

and your value is the offset. This internal __consumer_offsets topic is available to all consumers so the information is shared.

kafka consumer offset to earliest

updating the consumer offset to the earliest one

That's the default behavior and configuration for auto.offset.reset when your application.id is brand new.

Note that i'm using KStream to subscribe the topic.

Then you can't seek at runtime (or at least, shouldn't, assuming you're using stateful processors)

Use the Application reset tool which helps manage all topics used within your topology.

Kafka Consumer configuration - How does auto.offset.reset controls the message consumption

It depends.

auto.offset.reset only applies when there is no stored offset for the consumer group.

It applies to the following conditions:

  • the first time a consumer group consumes
  • if a consumer doesn't commit any offsets, the next time it is started
  • if a consumer group has been expired (7 days by default with modern brokers)
  • if the message the stored offset points to has been removed due to message retention policies (an attempt to read a message that has been purged triggers the application of the rule)

If a consumer commits an offset; it will start at the last committed offset the next time it is started.

Kafka: Who maintains that upto which offset number message is read by a consumer group?

The information about Consumer Groups is all stored in the internal Kafka topic __consumer_offsets. Whenever a new group tries to read data from a topic it checks its offset position in that internal topic which has a deletion policy set to compact. The compaction keeps this topic small.

Kafka comes with a command line tool kafka-consumer-groups.sh that helps you understand which information is stored for each consumer group.

More information is given in the Kafka Documentation on offset tracking.



Related Topics



Leave a reply



Submit