How to Create a Topic in Kafka Through Java

How to create a Topic in Kafka through Java

Edit - Zookeeper is not required in newer version of Kafka. Please see answer by @Neeleshkumar Srinivasan Mannur for API version 0.11.0+



Original answer

I fixed it.. After a long research..

ZkClient zkClient = new ZkClient("localhost:2181", 10000, 10000);
AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties());

From the above code, ZkClient will create a topic but this topic information will not have awareness for the kafka. So what we have to do is, we need to create object for ZkClient in following way,

First import the below statement,

import kafka.utils.ZKStringSerializer$;

and create object for ZkClient in the following way,

ZkClient zkClient = new ZkClient("localhost:2181", 10000, 10000, ZKStringSerializer$.MODULE$);
AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties());

Edit 1: (for @ajkret comment)

The above code won't work for kafka > 0.9 since the api has been changed,
Use the below code for kafka > 0.9


import java.util.Properties;
import kafka.admin.AdminUtils;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;

public class KafkaTopicCreationInJava
{
public static void main(String[] args) throws Exception {
ZkClient zkClient = null;
ZkUtils zkUtils = null;
try {
String zookeeperHosts = "192.168.20.1:2181"; // If multiple zookeeper then -> String zookeeperHosts = "192.168.20.1:2181,192.168.20.2:2181";
int sessionTimeOutInMs = 15 * 1000; // 15 secs
int connectionTimeOutInMs = 10 * 1000; // 10 secs

zkClient = new ZkClient(zookeeperHosts, sessionTimeOutInMs, connectionTimeOutInMs, ZKStringSerializer$.MODULE$);
zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHosts), false);

String topicName = "testTopic";
int noOfPartitions = 2;
int noOfReplication = 3;
Properties topicConfiguration = new Properties();

AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplication, topicConfiguration);

} catch (Exception ex) {
ex.printStackTrace();
} finally {
if (zkClient != null) {
zkClient.close();
}
}
}
}

Kafka 0.9 - How to create a topic through java api

For Kafka, the cluster determines how/if you're able to create topics. If you want to be able to create topics on the fly, the easiest way is to use auto.create.topics.enable on your cluster. Then when you send a message to a topic that doesn't exist, the cluster creates the topic with the cluster default partitions and replication factor. If you don't have/want this feature enabled, there are not methods that I know of in the Kafka clients library.

If you're determined, you can look into the internals of the kafka-topics.sh, which is where you'll find how Kafka creates topics with the core api.

Edit

Now Kafka offers Admin API which allows you to programmatically create topics (among other things). See official API docs. (kafka version 1.0.0)

How to programmatically create a Kafka 2.3.0 topic in Java

How about this one?

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

AdminClient adminClient = KafkaAdminClient.create(props);

CreateTopicsResult res = adminClient.createTopics(
Stream.of("foo", "bar", "baz").map(
name -> new NewTopic("my-topic-name", 3, (short) 1)
).collect(Collectors.toList())
);

res.all().get();

Disable auto topic creation from Spring Kafka Consumer

The below configuration in application.yml works perfectly for Consumers based on Spring for Apache Kafka:

spring:
kafka:
consumer:
properties:
allow.auto.create.topics: false

Here is a reference project for a simple Spring Kafka Consumer.

However in my case, I was also using the non-blocking retries provided by Spring Kafka in the form of @RetryableTopic annotation.

In this case in order to turn off auto topic creation from the Consumer, along with the above mentioned property change, we also need to set a property named autoCreateTopics to "false" in the @RetryableTopic annotation like so:

  @RetryableTopic(
attempts = "4",
backoff = @Backoff(delay = 1000),
fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC,
autoCreateTopics = "false"
)

It's default value is "true".

Here is a reference project for Spring Kafka Consumer with non-blocking retries.

Big thanks to tzolov for pointing me to the right direction.

How to AutoCreate Kafka Topic according to incoming topic pattern through kafkalistener service in springboot Kafka?

Add a rebalance listener, or extend AbstractConsumerSeekAware (or just implement ConsumerSeekAware).

public class LibraryEventsConsumer extends AbstractConsumerSeekAware {

Then, in onPartitionsAssigned() use an AdminClient to check if the DLT topic exists and, if not, create it.



Related Topics



Leave a reply



Submit