How to Create a Topic in Kafka from the Ide Using API

How do you programmatically create a topic with Kafka 1.1.0

This slightly modified code worked for me when I ran it stand-alone against a 1.1.0 broker:

public static void main(String[] args) {
final String ordersTopic = "test-orders";
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

try (final AdminClient adminClient = AdminClient.create(props)) {
try {
// Define topic
NewTopic newTopic = new NewTopic(ordersTopic, 1, (short)1);

// Create topic, which is async call.
final CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singleton(newTopic));

// Since the call is Async, Lets wait for it to complete.
createTopicsResult.values().get(ordersTopic).get();
} catch (InterruptedException | ExecutionException e) {
if (!(e.getCause() instanceof TopicExistsException))
throw new RuntimeException(e.getMessage(), e);
}
}
}

Since this is pretty similar to your code, and based on the error you're seeing, perhaps you haven't completely sorted out the dependencies to Kafka libraries? I used the Maven artifact org.apache.kafka:kafka_2.12:1.1.0.

How to create a Topic in Kafka through Java

Edit - Zookeeper is not required in newer version of Kafka. Please see answer by @Neeleshkumar Srinivasan Mannur for API version 0.11.0+



Original answer

I fixed it.. After a long research..

ZkClient zkClient = new ZkClient("localhost:2181", 10000, 10000);
AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties());

From the above code, ZkClient will create a topic but this topic information will not have awareness for the kafka. So what we have to do is, we need to create object for ZkClient in following way,

First import the below statement,

import kafka.utils.ZKStringSerializer$;

and create object for ZkClient in the following way,

ZkClient zkClient = new ZkClient("localhost:2181", 10000, 10000, ZKStringSerializer$.MODULE$);
AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties());

Edit 1: (for @ajkret comment)

The above code won't work for kafka > 0.9 since the api has been changed,
Use the below code for kafka > 0.9


import java.util.Properties;
import kafka.admin.AdminUtils;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;

public class KafkaTopicCreationInJava
{
public static void main(String[] args) throws Exception {
ZkClient zkClient = null;
ZkUtils zkUtils = null;
try {
String zookeeperHosts = "192.168.20.1:2181"; // If multiple zookeeper then -> String zookeeperHosts = "192.168.20.1:2181,192.168.20.2:2181";
int sessionTimeOutInMs = 15 * 1000; // 15 secs
int connectionTimeOutInMs = 10 * 1000; // 10 secs

zkClient = new ZkClient(zookeeperHosts, sessionTimeOutInMs, connectionTimeOutInMs, ZKStringSerializer$.MODULE$);
zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHosts), false);

String topicName = "testTopic";
int noOfPartitions = 2;
int noOfReplication = 3;
Properties topicConfiguration = new Properties();

AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplication, topicConfiguration);

} catch (Exception ex) {
ex.printStackTrace();
} finally {
if (zkClient != null) {
zkClient.close();
}
}
}
}

How to create topics in apache kafka?

When you are starting your Kafka broker you can define set of properties in conf/server.properties file. This file is just key value property file. One of the properties is auto.create.topics.enable, if it's set to true (by default) Kafka will create topics automatically when you send messages to non-existing topics.

All config options you can find are defined here. IMHO, a simple rule for creating topics is the following: number of replicas cannot be more than the number of nodes that you have. Number of topics and partitions is unaffected by the number of nodes in your cluster

for example:

  • You have 9 node cluster
  • Your topic can have 9 partitions and 9 replicas or
    18 partitions and 9 replicas or
    36 partitions and 9 replicas and so on...

Kafka topic creation best-practice

Whats is considered best-practise when creating topics for Apache Kafka?
Does everyone allow automatic creation of topics or how do you do it?

It depends on what you're doing. You can definitely use topic auto creation, but then the automatically created topics will have the default broker-wide configuration in terms of partitions and replication factor.

For Kafka Streams, a Confluent engineer writes that manually creating topics before starting the application is recommended:

I also want to point out, that it is highly recommended to not
use auto topic create for Streams, but to manually create all
input/output topics before you start your Streams application.

For more details, see
http://docs.confluent.io/current/streams/developer-guide.html#managing-topics-of-a-kafka-streams-application

With regard to:

Do you bundle the topic-creation-step with the starting of the kafka-instance?

Yes. If you have a Java application, you can use AdminClient in the main method of your application before you start your application. If you have some other kind of application, you can run an init script that calls bin/kafka-topics.sh before your application. If you're using Kubernetes, you can use a Kubernetes Init Container. But there are obviously any number of ways you can do this.

This feels abit "hacky" but maby its the only way?

I don't think this is hacky. Pretty normal to have init steps, I think.

Finally, also note that you may want to configure the retention policy on your topics. This can be done with broker-wide defaults or on a per-topic basis: https://stackoverflow.com/a/48504305/741970.

Update

Thanks to Peter S. for pointing out that the officially recommended way to create topics is in the CI pipeline:

The recommended approach is to create topics through a continuous integration pipeline, where topics are defined in source control and created through a build process. This ensures scripts can validate that all topic names conform to the desired conventions before getting created. A helpful tool to manage topics within a Kafka cluster is kafka-dsf.



Related Topics



Leave a reply



Submit