How to pass topics dynamically to a kafka listener?
You cannot "dynamically pass topics to Kafka listener "; you have to programmatically create a listener container instead.
Use Kakfa connection to dynamically Subscribe/Unsubscribe the Kafka Topics using Spring Boot
2.3.x has been out of support for a long time now
https://spring.io/projects/spring-kafka#support
The last 2.3.x version was 2.3.14 last July.
The admin is used to check if the topic exists; with that old version, controlled by the missingTopicsFatal
property in ContainerProperties
; it is true in that version.
With modern versions (since 2.3.4), it is false, so an AdminClient will not be created when you start the container.
But you really need to upgrade to a supported version (2.8.5 advised - 2.7.x goes out of OSS support soon).
There is no support to reuse a consumer with different (topics); a new consumer will be created each time you start the container.
Spring Kafka with Dynamic @KafkaListener
https://docs.spring.io/spring-framework/docs/current/javadoc-api/org/springframework/context/support/GenericApplicationContext.html#registerBean-java.lang.Class-java.util.function.Supplier-org.springframework.beans.factory.config.BeanDefinitionCustomizer...-
Create your object and register it as a bean providing it via the Supplier in the above method. Spring will run the bean post processors necessary to set everything up.
Kafka consumer picking up topics dynamically
No; the only way to do that is to use a topic pattern; as new topics are added (that match the pattern), the broker will add them to the subscription, after 5 minutes, by default.
You can, however, add new listener container(s) for the new topic(s) at runtime.
Another option would be to load the @KafkaListener
bean in a child application context and re-create the context each time the topic(s) change.
EDIT
See the javadocs for KafkaConsumer.subscribe(Pattern pattern)
...
/**
* Subscribe to all topics matching specified pattern to get dynamically assigned partitions.
* The pattern matching will be done periodically against topics existing at the time of check.
* <p>
...
Change listening topic during runtime in Spring Kafka
To my knowledge, going through the source code, you can't change the topic at runtime. So you'll want to stop the current container and recreate a new one.
I'd advise against using the registry in this case and manage the container yourself, because it seems like you can't remove containers from the registry and will end up with a memory leak.
You can autowire yourself KafkaListenerContainerFactory
. This factory requires a an endpoint. I must admit, that it seemed a bit painful to me to setup the endpoint, if you just want to change the topic and have a callback called, because all available implementations use meta programming with bean and method references.
The following snipped should get you started, although it might need some more tweaking.
@SpringBootApplication
@EnableKafka
public class KafkaDemoApplication {
private KafkaListenerContainerFactory<?> factory;
public static void main(String[] args) {
SpringApplication.run(KafkaDemoApplication.class, args);
}
@Autowired
public void setFactory(KafkaListenerContainerFactory<?> factory) {
this.factory = factory;
}
@EventListener(classes = {ApplicationStartedEvent.class})
public void onStarted() throws InterruptedException, NoSuchMethodException {
var listenerContainer = factory.createListenerContainer(getEndpoint("my_topic_3"));
registry.stop();
listenerContainer.start();
Thread.sleep(2000);
listenerContainer.stop();
listenerContainer = factory.createListenerContainer(getEndpoint("my_topic_4"));
listenerContainer.start();
Thread.sleep(2000);
listenerContainer.stop();
}
private KafkaListenerEndpoint getEndpoint(String topic) throws NoSuchMethodException {
var listenerEndpoint = new MethodKafkaListenerEndpoint<String, String>();
listenerEndpoint.setMessageHandlerMethodFactory(new DefaultMessageHandlerMethodFactory());
listenerEndpoint.setBean(this);
listenerEndpoint.setMethod(getClass().getMethod("onMessage", String.class, String.class));
listenerEndpoint.setTopics(topic);
return listenerEndpoint;
}
public void onMessage(String key, String value) {
System.out.println(key + ":" + value)
}
}
As a side note, you can implement KafkaListenerConfigurer, if you want access to the registry, because it's not autowireable. But again, don't use it, if you want to kill your containers, because you can't remove the references, as far as I know.
Related Topics
Spring Boot: Cannot Access Rest Controller on Localhost (404)
How to Configure Hikaricp in My Spring Boot App in My Application.Properties Files
How to Fix Expected Begin_Object But Was String in Retrofit
Java Jackson Deserialization of Nested Objects
Crudrepository How to Find by Last Date Between
Iterating Over Two Arrays Simultaneously Using for Each Loop in Java
Comparing Two List by Using Java8 Matching Methods
Java Floating Point Number in Comma Instead of Dot
How to Test for Blank Line With Java Scanner
Finding Max Value in an Array Using Recursion
How to Solve Liquibase Checksum Validation Fail After Liquibase Upgrade
Java.Lang.Noclassdeffounderror: Org/Json/Simple/Parser/Parseexception With Eclipse and Spring
Admob No Fill from Ad Server - Failed to Load Ad: 3
Check Date Between Two Other Dates Spring Data Jpa
Tomcat Is Running But 8080 Port Is Not Responding