Spring Boot Kafka Consumers does not consume in the order the messages are produced
Your topic has 3 partitions. There will be no order guarantee unless you use exactly one partition. More specifically, data is only ordered within a partition; each consumer is consuming data that is ordered within its assigned partitions.
To show this, try
@KafkaListener(topics = "${topic.name.consumer}", groupId = "group-1")
public void consumeLinks(
@Payload String word,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
System.out.println(
"Received Message: " + word
+ " from partition: " + partition);
And if you use one partition, that means you can only have one consumer in that consumer group.
Kafka consumer doesn't consume messages from existing topic
The issue is with the brokers and topic replication factor. I used your docker-compose file to deploy kafka, I connected to see the logs and there were messages:
ERROR [KafkaApi-1] Number of alive brokers '1' does not meet the required replication factor '3' for the offsets topic (configured via 'offsets.topic.replication.factor'). This error can be ignored if the cluster is starting up and not all brokers are up yet. (kafka.server.KafkaApis)
To solve this problem I had to add `KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1' for broker configuration. So my broker service config looks like this:
broker:
image: confluentinc/cp-kafka
hostname: broker
depends_on:
- zookeeper
ports:
- "3002:3002"
networks:
- bridge_network
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:3001'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:3002'
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
After restarting the broker I was able to produce/consume messages.
Kafka consumer missing messages while consuming messages in loop
Update :
I was able to figure out the issue myself. The messages are already downloaded in the records and while looping as I have put the following condition
if(i>=msgbtch)
{
pollFlag = false; // Assigning flag value to end the poll at 5000 messages
break;
}
Even before placing all the messages in list the loop is breaking and all the messages from records is not being inserted in the list. I have removed the break condition and it's working fine
Intermittently consuming kafka messages in a springboot application
You need to manage consuming process manually. It is possible if you disable autostartup
feature of KafkaListener
.
There are two discussions about this. See below:
Is there any example of Spring Schedule that reads Kafka topic?
Spring Boot Job scheduler with Kafka consumer
@KafkaListener is not consuming messages - issue with deserialization
You can deserialize the record from kfka into POJO, for versions <2.2.x use the MessageConverter
Starting with version 2.2, you can explicitly configure the deserializer to use the supplied target type and ignore type information in headers by using one of the overloaded constructors that have a boolean
@Bean
public ConsumerFactory<String,PageViewEvent > priceEventConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "json");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(PageViewEvent.class,false));
}
Or by using MessageConverter
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Bytes> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Bytes> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setMessageConverter(new StringJsonMessageConverter());
return factory;
}
Related Topics
How to Read a File, Reverse the Order, and Write Reverse Order
Setting a Log File Name to Include Current Date in Log4J
How to Read All Classes from a Java Package in the Classpath
Jsr 303 Validation, If One Field Equals "Something", Then These Other Fields Should Not Be Null
How to Mark Cucumber Junit Step as Failed But Continue
Prevent Empty Values on String Split
Print Out All Possible Combinations of an Arraylist<Arraylist<String>> Recursively
When I Run Mockito Test Occurs Wrongtypeofreturnvalue Exception
Bottomnavigationview - How to Avoid Recreation of Fragments and Reuse Them
Source Folder Is Not on the Java Build Class Path, Creating Java Package
Maven 3.8.0 Compiler - Fatal Error Compiling: Release Version 11 Not Supported
Java: Print Elements of Array and Insert Line Break
At Runtime, Find All Classes in a Java Application That Extend a Base Class
How to Mock JPA Repository'S Save Method in Unit Tests
Java Program to Check for Similar Digits
Every Digits in Number of a Range Should Be Is Divisible by N
Unable to Deploy Spring Boot 2 Application Due to Hikaripool Exception During Pool Initialization