学习总结录 学习总结录
首页
归档
分类
标签
  • Java基础
  • Java集合
  • MySQL
  • Redis
  • JVM
  • 多线程
  • 计算机网络
  • 操作系统
  • Spring
  • Kafka
  • Elasticsearch
  • Python
  • 面试专题
  • 案例实践
  • 工具使用
  • 项目搭建
  • 服务治理
  • ORM框架
  • 分布式组件
  • MiniSpring
  • 设计模式
  • 算法思想
  • 编码规范
友链
关于
GitHub (opens new window)
首页
归档
分类
标签
  • Java基础
  • Java集合
  • MySQL
  • Redis
  • JVM
  • 多线程
  • 计算机网络
  • 操作系统
  • Spring
  • Kafka
  • Elasticsearch
  • Python
  • 面试专题
  • 案例实践
  • 工具使用
  • 项目搭建
  • 服务治理
  • ORM框架
  • 分布式组件
  • MiniSpring
  • 设计模式
  • 算法思想
  • 编码规范
友链
关于
GitHub (opens new window)
  • Java基础

  • Java集合

  • MySQL

  • Redis

  • JVM

  • 多线程

  • 计算机网络

  • Spring

  • Kafka

    • 生产者客户端开发
    • 消费者客户端开发
    • 主题与分区管理
    • 配置管理
    • KafkaAdminClient
      • 一、概述
      • 二、主题操作
        • 1、创建主题
        • 2、查看主题
        • 3、查看所有可用主题
        • 4、删除主题
      • 三、配置操作
        • 1、查看配置
        • 2、修改配置
      • 参考
    • 消费管理
    • Kafka Streams
    • 日志存储
    • 可靠性研究
    • 深入服务端
    • 深入客户端
    • 集群参数配置
    • 生产者消息分区机制原理
    • 如何确保消息不丢失
    • 如何确保消息不重复消费
    • 消费积压如何处理
    • 生产者是如何管理TCP连接
    • 消费者重平衡问题
    • 位移提交问题
    • 消费者是如何管理TCP连接
    • 副本机制深入
    • 消费组消费进度如何监控
    • 高水位和Leader Epoch
  • Elasticsearch

  • Python

  • 面试专题

  • 知识库
  • Kafka
旭日
2023-04-17
目录

KafkaAdminClient

# 一、概述

对于主题的管理,我们可以使用kafka-topics.sh脚本命令进行管理。

但是如果我们希望将主题管理类的功能集成到公司内部的系统中,打造集管理、监控、运维、告警为一体的生态平台,那么就需要以程序调用 API 的方式去实现。

KafkaAdminClient提供了多种方法:

  • 创建主题:CreateTopicsResult createTopics(Collection newTopics)。
  • 删除主题:DeleteTopicsResult deleteTopics(Collection topics)。
  • 列出所有可用的主题:ListTopicsResult listTopics()。
  • 查看主题的信息:DescribeTopicsResult describeTopics(Collection topicNames)。
  • 查询配置信息:DescribeConfigsResult describeConfigs(Collection resources)。
  • 修改配置信息:AlterConfigsResult alterConfigs(Map<ConfigResource, Config> configs)。
  • 增加分区:CreatePartitionsResult createPartitions(Map<String, NewPartitions> newPartitions)。

# 二、主题操作

# 1、创建主题

public class KafkaAdminClientCreateDemo {
    public static final String BROKER_LIST = "localhost:9092";
    public static final String TOPIC = "quickstart-events-test";

    public static Properties initConfig(){
        Properties props = new Properties();
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);
        props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
        return props;
    }

    public static void main(String[] args) {
        // 配置
        Properties props = initConfig();
        AdminClient client = AdminClient.create(props);
        // 创建主题
        NewTopic newTopic = new NewTopic(TOPIC, 4, (short) 1);
        // 调用API
        CreateTopicsResult result = client.createTopics(Collections.singleton(newTopic));
        try {
            result.all().get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
        client.close();
    }
}
  • 用给定的配置创建一个新的Admin
  • 创建具有指定复制因子和分区数的新主题。
  • 调用创建API

其中NewTopic 中的成员变量如下所示:

private final String name;	//主题名称
private final int numPartitions; 	//分区数
private final short replicationFactor; 	//副本因子
private final Map<Integer, List<Integer>> replicasAssignments; 	//分配方案
private Map<String, String> configs = null; 	//配置

# 2、查看主题

public class KafkaAdminClientDescDemo {
    public static final String BROKER_LIST = "localhost:9092";
    public static final String TOPIC = "quickstart-events-test";

    public static Properties initConfig(){
        Properties props = new Properties();
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);
        props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
        return props;
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties props = initConfig();
        AdminClient client = AdminClient.create(props);
        DescribeTopicsResult result = client.describeTopics(Collections.singleton(TOPIC));
        TopicDescription topicDescription = result.topicNameValues().get(TOPIC).get();
        System.out.println(topicDescription.topicId() + ":" + topicDescription.name());
    }
}

// output
// dN6pIT45Sm-Y9ArYWdX6Kw:quickstart-events-test

# 3、查看所有可用主题

public class KafkaAdminClientAllTopicDemo {
    public static final String BROKER_LIST = "localhost:9092";
    public static final String TOPIC = "quickstart-events-test";

    public static Properties initConfig(){
        Properties props = new Properties();
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);
        props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
        return props;
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties props = initConfig();
        AdminClient client = AdminClient.create(props);
        ListTopicsResult result = client.listTopics();
        KafkaFuture<Set<String>> names = result.names();
        Set<String> strings = names.get();
        strings.forEach(System.out::println);
    }
}

// output
// quickstart-events
// quickstart-events-test

# 4、删除主题

public class KafkaAdminClientDeleteDemo {
    public static final String BROKER_LIST = "localhost:9092";
    public static final String TOPIC = "quickstart-events-test";

    public static Properties initConfig(){
        Properties props = new Properties();
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);
        props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
        return props;
    }

    public static void main(String[] args) {
        Properties props = initConfig();
        AdminClient client = AdminClient.create(props);
        DeleteTopicsResult result = client.deleteTopics(Collections.singleton(TOPIC));
        try {
            result.all().get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
        client.close();
    }
}

# 三、配置操作

# 1、查看配置

public class KafkaAdminClientDescTopicDemo {
    public static final String BROKER_LIST = "localhost:9092";
    public static final String TOPIC = "quickstart-events";

    public static Properties initConfig() {
        Properties props = new Properties();
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);
        props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
        return props;
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties props = initConfig();
        AdminClient client = AdminClient.create(props);
        // 配置(操作对象类型和操作对象名称)
        ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, TOPIC);
        DescribeConfigsResult result = client.describeConfigs(Collections.singleton(resource));
        Config config = result.all().get().get(resource);
        System.out.println(config);
        client.close();
    }
}
  • 用给定的配置创建一个新的Admin
  • 创建一个配置(操作对象类型和操作对象名称)
  • 调用API

其中ConfigResource的构造函数如下:

    /**
     * Create an instance of this class with the provided parameters.
     *
     * @param type a non-null resource type
     * @param name a non-null resource name
     */
    public ConfigResource(Type type, String name) {
        Objects.requireNonNull(type, "type should not be null");
        Objects.requireNonNull(name, "name should not be null");
        this.type = type;
        this.name = name;
    }

这个类中有一个枚举:

    public enum Type {
        BROKER_LOGGER((byte) 8), BROKER((byte) 4), TOPIC((byte) 2), UNKNOWN((byte) 0);

        private static final Map<Byte, Type> TYPES = Collections.unmodifiableMap(
            Arrays.stream(values()).collect(Collectors.toMap(Type::id, Function.identity()))
        );

        private final byte id;

        Type(final byte id) {
            this.id = id;
        }

        public byte id() {
            return id;
        }

        public static Type forId(final byte id) {
            return TYPES.getOrDefault(id, UNKNOWN);
        }
    }

# 2、修改配置

ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topic);
ConfigEntry entry = new ConfigEntry("cleanup.policy", "compact");
Config config = new Config(Collections.singleton(entry));
Map<ConfigResource, Config> configs = new HashMap<>();
configs.put(resource, config);
AlterConfigsResult result = client.alterConfigs(configs);
result.all().get();

# 参考

官方文档 (opens new window)

图解Kafka之实战指南 (opens new window)

#消息队列
上次更新: 2024/06/29, 15:13:44
配置管理
消费管理

← 配置管理 消费管理→

最近更新
01
基础概念
10-31
02
Pytorch
10-30
03
Numpy
10-30
更多文章>
Theme by Vdoing | Copyright © 2021-2024 旭日 | 蜀ICP备2021000788号-1
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式