KafkaAdminClient
# 一、概述
对于主题的管理,我们可以使用kafka-topics.sh
脚本命令进行管理。
但是如果我们希望将主题管理类的功能集成到公司内部的系统中,打造集管理、监控、运维、告警为一体的生态平台,那么就需要以程序调用 API 的方式去实现。
KafkaAdminClient
提供了多种方法:
创建主题
:CreateTopicsResult createTopics(Collection newTopics)。删除主题
:DeleteTopicsResult deleteTopics(Collection topics)。列出所有可用的主题
:ListTopicsResult listTopics()。查看主题的信息
:DescribeTopicsResult describeTopics(Collection topicNames)。查询配置信息
:DescribeConfigsResult describeConfigs(Collection resources)。修改配置信息
:AlterConfigsResult alterConfigs(Map<ConfigResource, Config> configs)。增加分区
:CreatePartitionsResult createPartitions(Map<String, NewPartitions> newPartitions)。
# 二、主题操作
# 1、创建主题
public class KafkaAdminClientCreateDemo {
public static final String BROKER_LIST = "localhost:9092";
public static final String TOPIC = "quickstart-events-test";
public static Properties initConfig(){
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);
props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
return props;
}
public static void main(String[] args) {
// 配置
Properties props = initConfig();
AdminClient client = AdminClient.create(props);
// 创建主题
NewTopic newTopic = new NewTopic(TOPIC, 4, (short) 1);
// 调用API
CreateTopicsResult result = client.createTopics(Collections.singleton(newTopic));
try {
result.all().get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
client.close();
}
}
- 用给定的配置创建一个新的Admin
- 创建具有指定复制因子和分区数的新主题。
- 调用创建API
其中NewTopic
中的成员变量如下所示:
private final String name; //主题名称
private final int numPartitions; //分区数
private final short replicationFactor; //副本因子
private final Map<Integer, List<Integer>> replicasAssignments; //分配方案
private Map<String, String> configs = null; //配置
# 2、查看主题
public class KafkaAdminClientDescDemo {
public static final String BROKER_LIST = "localhost:9092";
public static final String TOPIC = "quickstart-events-test";
public static Properties initConfig(){
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);
props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
return props;
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = initConfig();
AdminClient client = AdminClient.create(props);
DescribeTopicsResult result = client.describeTopics(Collections.singleton(TOPIC));
TopicDescription topicDescription = result.topicNameValues().get(TOPIC).get();
System.out.println(topicDescription.topicId() + ":" + topicDescription.name());
}
}
// output
// dN6pIT45Sm-Y9ArYWdX6Kw:quickstart-events-test
# 3、查看所有可用主题
public class KafkaAdminClientAllTopicDemo {
public static final String BROKER_LIST = "localhost:9092";
public static final String TOPIC = "quickstart-events-test";
public static Properties initConfig(){
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);
props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
return props;
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = initConfig();
AdminClient client = AdminClient.create(props);
ListTopicsResult result = client.listTopics();
KafkaFuture<Set<String>> names = result.names();
Set<String> strings = names.get();
strings.forEach(System.out::println);
}
}
// output
// quickstart-events
// quickstart-events-test
# 4、删除主题
public class KafkaAdminClientDeleteDemo {
public static final String BROKER_LIST = "localhost:9092";
public static final String TOPIC = "quickstart-events-test";
public static Properties initConfig(){
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);
props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
return props;
}
public static void main(String[] args) {
Properties props = initConfig();
AdminClient client = AdminClient.create(props);
DeleteTopicsResult result = client.deleteTopics(Collections.singleton(TOPIC));
try {
result.all().get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
client.close();
}
}
# 三、配置操作
# 1、查看配置
public class KafkaAdminClientDescTopicDemo {
public static final String BROKER_LIST = "localhost:9092";
public static final String TOPIC = "quickstart-events";
public static Properties initConfig() {
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);
props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
return props;
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = initConfig();
AdminClient client = AdminClient.create(props);
// 配置(操作对象类型和操作对象名称)
ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, TOPIC);
DescribeConfigsResult result = client.describeConfigs(Collections.singleton(resource));
Config config = result.all().get().get(resource);
System.out.println(config);
client.close();
}
}
- 用给定的配置创建一个新的Admin
- 创建一个配置(操作对象类型和操作对象名称)
- 调用API
其中ConfigResource
的构造函数如下:
/**
* Create an instance of this class with the provided parameters.
*
* @param type a non-null resource type
* @param name a non-null resource name
*/
public ConfigResource(Type type, String name) {
Objects.requireNonNull(type, "type should not be null");
Objects.requireNonNull(name, "name should not be null");
this.type = type;
this.name = name;
}
这个类中有一个枚举:
public enum Type {
BROKER_LOGGER((byte) 8), BROKER((byte) 4), TOPIC((byte) 2), UNKNOWN((byte) 0);
private static final Map<Byte, Type> TYPES = Collections.unmodifiableMap(
Arrays.stream(values()).collect(Collectors.toMap(Type::id, Function.identity()))
);
private final byte id;
Type(final byte id) {
this.id = id;
}
public byte id() {
return id;
}
public static Type forId(final byte id) {
return TYPES.getOrDefault(id, UNKNOWN);
}
}
# 2、修改配置
ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topic);
ConfigEntry entry = new ConfigEntry("cleanup.policy", "compact");
Config config = new Config(Collections.singleton(entry));
Map<ConfigResource, Config> configs = new HashMap<>();
configs.put(resource, config);
AlterConfigsResult result = client.alterConfigs(configs);
result.all().get();
# 参考
上次更新: 2024/06/29, 15:13:44