How to create a Topic in Kafka through Java
Edit - Zookeeper is not required in newer version of Kafka. Please see answer by @Neeleshkumar Srinivasan Mannur for API version 0.11.0+
Original answer
I fixed it.. After a long research..
ZkClient zkClient = new ZkClient("localhost:2181", 10000, 10000);
AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties());
From the above code, ZkClient will create a topic but this topic information will not have awareness for the kafka. So what we have to do is, we need to create object for ZkClient in following way,
First import the below statement,
import kafka.utils.ZKStringSerializer$;
and create object for ZkClient in the following way,
ZkClient zkClient = new ZkClient("localhost:2181", 10000, 10000, ZKStringSerializer$.MODULE$);
AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties());
Edit 1: (for @ajkret comment)
The above code won't work for kafka > 0.9 since the api has been changed,
Use the below code for kafka > 0.9
import java.util.Properties;
import kafka.admin.AdminUtils;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
public class KafkaTopicCreationInJava
{
public static void main(String[] args) throws Exception {
ZkClient zkClient = null;
ZkUtils zkUtils = null;
try {
String zookeeperHosts = "192.168.20.1:2181"; // If multiple zookeeper then -> String zookeeperHosts = "192.168.20.1:2181,192.168.20.2:2181";
int sessionTimeOutInMs = 15 * 1000; // 15 secs
int connectionTimeOutInMs = 10 * 1000; // 10 secs
zkClient = new ZkClient(zookeeperHosts, sessionTimeOutInMs, connectionTimeOutInMs, ZKStringSerializer$.MODULE$);
zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHosts), false);
String topicName = "testTopic";
int noOfPartitions = 2;
int noOfReplication = 3;
Properties topicConfiguration = new Properties();
AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplication, topicConfiguration);
} catch (Exception ex) {
ex.printStackTrace();
} finally {
if (zkClient != null) {
zkClient.close();
}
}
}
}
Kafka 0.9 - How to create a topic through java api
For Kafka, the cluster determines how/if you're able to create topics. If you want to be able to create topics on the fly, the easiest way is to use auto.create.topics.enable on your cluster. Then when you send a message to a topic that doesn't exist, the cluster creates the topic with the cluster default partitions and replication factor. If you don't have/want this feature enabled, there are not methods that I know of in the Kafka clients library.
If you're determined, you can look into the internals of the kafka-topics.sh, which is where you'll find how Kafka creates topics with the core api.
Edit
Now Kafka offers Admin API which allows you to programmatically create topics (among other things). See official API docs. (kafka version 1.0.0)
How to programmatically create a Kafka 2.3.0 topic in Java
How about this one?
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
AdminClient adminClient = KafkaAdminClient.create(props);
CreateTopicsResult res = adminClient.createTopics(
Stream.of("foo", "bar", "baz").map(
name -> new NewTopic("my-topic-name", 3, (short) 1)
).collect(Collectors.toList())
);
res.all().get();
Disable auto topic creation from Spring Kafka Consumer
The below configuration in application.yml
works perfectly for Consumers based on Spring for Apache Kafka:
spring:
kafka:
consumer:
properties:
allow.auto.create.topics: false
Here is a reference project for a simple Spring Kafka Consumer.
However in my case, I was also using the non-blocking retries provided by Spring Kafka in the form of @RetryableTopic
annotation.
In this case in order to turn off auto topic creation from the Consumer, along with the above mentioned property change, we also need to set a property named autoCreateTopics
to "false"
in the @RetryableTopic
annotation like so:
@RetryableTopic(
attempts = "4",
backoff = @Backoff(delay = 1000),
fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC,
autoCreateTopics = "false"
)
It's default value is "true"
.
Here is a reference project for Spring Kafka Consumer with non-blocking retries.
Big thanks to tzolov for pointing me to the right direction.
How to AutoCreate Kafka Topic according to incoming topic pattern through kafkalistener service in springboot Kafka?
Add a rebalance listener, or extend AbstractConsumerSeekAware
(or just implement ConsumerSeekAware
).
public class LibraryEventsConsumer extends AbstractConsumerSeekAware {
Then, in onPartitionsAssigned()
use an AdminClient
to check if the DLT topic exists and, if not, create it.
Related Topics
Using Hibernate's Scrollableresults to Slowly Read 90 Million Records
How to Keep a User Logged into My Site for Months
How to Move a File from One Location to Another in Java
How to Multiply Strings in Java to Repeat Sequences
Difference Between Generic Type and Wildcard Type
Java 8 Lambda Expressions - What About Multiple Methods in Nested Class
How Is Values() Implemented for Java 6 Enums
Multipart File Upload Using Spring Rest Template + Spring Web MVC
Java for Loop Syntax: "For (T Obj:Objects)"
Show Jframe in a Specific Screen in Dual Monitor Configuration
Assert Equals Between 2 Lists in Junit
Get the Index of a Pattern in a String Using Regex
Where to Find Source Code for Java.Lang Native Methods
Find Elements Inside Forms and Iframe Using Java and Selenium Webdriver