Kafka Consumer in Java Not Consuming Messages

Spring Boot Kafka Consumers does not consume in the order the messages are produced

Your topic has 3 partitions. There will be no order guarantee unless you use exactly one partition. More specifically, data is only ordered within a partition; each consumer is consuming data that is ordered within its assigned partitions.

To show this, try

@KafkaListener(topics = "${topic.name.consumer}", groupId = "group-1")
public void consumeLinks(
@Payload String word,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
System.out.println(
"Received Message: " + word
+ " from partition: " + partition);

And if you use one partition, that means you can only have one consumer in that consumer group.

Kafka consumer doesn't consume messages from existing topic

The issue is with the brokers and topic replication factor. I used your docker-compose file to deploy kafka, I connected to see the logs and there were messages:

ERROR [KafkaApi-1] Number of alive brokers '1' does not meet the required replication factor '3' for the offsets topic (configured via 'offsets.topic.replication.factor'). This error can be ignored if the cluster is starting up and not all brokers are up yet. (kafka.server.KafkaApis)

To solve this problem I had to add `KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1' for broker configuration. So my broker service config looks like this:

broker:
image: confluentinc/cp-kafka
hostname: broker
depends_on:
- zookeeper
ports:
- "3002:3002"
networks:
- bridge_network
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:3001'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:3002'
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

After restarting the broker I was able to produce/consume messages.

Kafka consumer missing messages while consuming messages in loop

Update :
I was able to figure out the issue myself. The messages are already downloaded in the records and while looping as I have put the following condition

if(i>=msgbtch)
{
pollFlag = false; // Assigning flag value to end the poll at 5000 messages
break;
}

Even before placing all the messages in list the loop is breaking and all the messages from records is not being inserted in the list. I have removed the break condition and it's working fine

Intermittently consuming kafka messages in a springboot application

You need to manage consuming process manually. It is possible if you disable autostartup feature of KafkaListener.

There are two discussions about this. See below:

Is there any example of Spring Schedule that reads Kafka topic?

Spring Boot Job scheduler with Kafka consumer

@KafkaListener is not consuming messages - issue with deserialization

You can deserialize the record from kfka into POJO, for versions <2.2.x use the MessageConverter

Starting with version 2.2, you can explicitly configure the deserializer to use the supplied target type and ignore type information in headers by using one of the overloaded constructors that have a boolean

@Bean
public ConsumerFactory<String,PageViewEvent > priceEventConsumerFactory() {

Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "json");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(PageViewEvent.class,false));

}

Or by using MessageConverter

 @Bean
public ConcurrentKafkaListenerContainerFactory<String, Bytes> kafkaListenerContainerFactory() {

ConcurrentKafkaListenerContainerFactory<String, Bytes> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setMessageConverter(new StringJsonMessageConverter());
return factory;
}


Related Topics



Leave a reply



Submit