学习总结录 学习总结录
首页
归档
分类
标签
  • Java基础
  • Java集合
  • MySQL
  • Redis
  • JVM
  • 多线程
  • 计算机网络
  • 操作系统
  • Spring
  • Kafka
  • Elasticsearch
  • Python
  • 面试专题
  • 案例实践
  • 工具使用
  • 项目搭建
  • 服务治理
  • ORM框架
  • 分布式组件
  • MiniSpring
  • 设计模式
  • 算法思想
  • 编码规范
友链
关于
GitHub (opens new window)
首页
归档
分类
标签
  • Java基础
  • Java集合
  • MySQL
  • Redis
  • JVM
  • 多线程
  • 计算机网络
  • 操作系统
  • Spring
  • Kafka
  • Elasticsearch
  • Python
  • 面试专题
  • 案例实践
  • 工具使用
  • 项目搭建
  • 服务治理
  • ORM框架
  • 分布式组件
  • MiniSpring
  • 设计模式
  • 算法思想
  • 编码规范
友链
关于
GitHub (opens new window)
  • Java基础

  • Java集合

  • MySQL

  • Redis

  • JVM

  • 多线程

  • 计算机网络

  • Spring

  • Kafka

    • 生产者客户端开发
    • 消费者客户端开发
    • 主题与分区管理
    • 配置管理
    • KafkaAdminClient
    • 消费管理
    • Kafka Streams
    • 日志存储
    • 可靠性研究
    • 深入服务端
    • 深入客户端
    • 集群参数配置
    • 生产者消息分区机制原理
    • 如何确保消息不丢失
    • 如何确保消息不重复消费
    • 消费积压如何处理
    • 生产者是如何管理TCP连接
    • 消费者重平衡问题
    • 位移提交问题
    • 消费者是如何管理TCP连接
    • 副本机制深入
    • 消费组消费进度如何监控
      • 一、消费监控意义
      • 二、消费监控方式
        • 1、Kafka 自带命令
        • 2、Kafka Java Consumer API
        • 3、Kafka JMX 监控指标
      • 参考
    • 高水位和Leader Epoch
  • Elasticsearch

  • Python

  • 面试专题

  • 知识库
  • Kafka
旭日
2023-10-13
目录

消费组消费进度如何监控

# 一、消费监控意义

对于Kakfa消费者来说,最重要的事情就是监控他的消费进度,或者说是监控它们消费的滞后程度。这个滞后程度有个专门的名称:消费者 Lag 或 Consumer Lag,而所谓滞后程度就是指消费者当前落后生产者的速度。

比如生产者生产了100W的数据,消费者消费了80W的数据,那么消费者滞后了 20 万条消息,即 Lag 等于 20 万。

但是通常来说,lag这个指标维度是在分区上的,有时候我们需要统计主题级别,就需要把某一个主题下所有分区的lag进行相加。

一个正常工作的消费者,它的 Lag 值应该很小,甚至是接近于 0 的,这表示该消费者能够及时地消费生产者生产出来的消息,滞后程度很小。反之,如果一个消费者 Lag 值很大,通常就表明它无法跟上生产者的速度,最终 Lag 会越来越大,从而拖慢下游消息的处理速度。

# 二、消费监控方式

# 1、Kafka 自带命令

kafka-consumer-groups脚本是 Kafka 为我们提供的最直接的监控消费者消费进度的工具。

$ bin/kafka-consumer-groups.sh --bootstrap-server <Kafka broker连接信息> --describe --group <group名称>

Kafka 连接信息就是 < 主机名:端口 > 对,而 group 名称就是你的消费者程序中设置的 group.id 值

img

  • LOG-END-OFFSET:每个分区当前最新生产的消息的位移值
  • CURRENT-OFFSET:该消费者组当前最新消费消息的位移值
  • LAG 值:前两者的差值,也就是消费滞后的数量

# 2、Kafka Java Consumer API

如何利用 Consumer 端 API 监控给定消费者组的 Lag 值:

public static Map<TopicPartition, Long> lagOf(String groupID, String bootstrapServers) throws TimeoutException {
        Properties props = new Properties();
        props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        try (AdminClient client = AdminClient.create(props)) {
            ListConsumerGroupOffsetsResult result = client.listConsumerGroupOffsets(groupID);
            try {
                Map<TopicPartition, OffsetAndMetadata> consumedOffsets = result.partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS);
                props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 禁止自动提交位移
                props.put(ConsumerConfig.GROUP_ID_CONFIG, groupID);
                props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
                props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
                try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
                    Map<TopicPartition, Long> endOffsets = consumer.endOffsets(consumedOffsets.keySet());
                    return endOffsets.entrySet().stream().collect(Collectors.toMap(entry -> entry.getKey(),
                            entry -> entry.getValue() - consumedOffsets.get(entry.getKey()).offset()));
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                // 处理中断异常
                // ...
                return Collections.emptyMap();
            } catch (ExecutionException e) {
                // 处理ExecutionException
                // ...
                return Collections.emptyMap();
            } catch (TimeoutException e) {
                throw new TimeoutException("Timed out when getting lag for consumer group " + groupID);
            }
        }
    }

这段代码只适用于 Kafka 2.0.0 及以上的版本

# 3、Kafka JMX 监控指标

在很多监控场景中,我们往往是有借助现有的框架,比如Grafana,这时候前两种方式就不太适用了。

当前,Kafka 消费者提供了一个名为 kafka.consumer:type=consumer-fetch-manager-metrics,client-id=“{client-id}”的 JMX 指标:

  • records-lag-max:消费者在测试窗口时间内曾经达到的最大的 Lag 值
  • records-lead-min:消费者在测试窗口时间内曾经达到的最小 Lead 值

这里的 Lead 值是指消费者最新消费消息的位移与分区当前第一条消息位移的差值。很显然,Lag 和 Lead 是一体的两个方面:Lag 越大的话,Lead 就越小,反之也是同理。

双监控

  • lag的增大,意味着这个阶段,消费者的速度是赶不上生产者的速度,但是如果只是一段时间还是能接受的。
  • Lead的减少,意味着消费者最新消费消息马上就是分区第一条消息,如果是快接近于 0 了,就一定要小心了,这可能预示着消费者端要丢消息了

我们知道 Kafka 的消息是有留存时间设置的,默认是 1 周,也就是说 Kafka 默认删除 1 周前的数据。倘若你的消费者程序足够慢,慢到它要消费的数据快被 Kafka 删除了,这时你就必须立即处理,否则一定会出现消息被删除,从而导致消费者程序重新调整位移值的情形。

Lag 值从 100 万增加到 200 万这件事情,远不如 Lead 值从 200 减少到 100 这件事来得重要。在实际生产环境中,请你一定要同时监控 Lag 值和 Lead 值

# 参考

官方文档 (opens new window)

图解Kafka之实战指南 (opens new window)

Kafka 核心技术与实战 (opens new window)

#消息队列
上次更新: 2024/06/29, 15:13:44
副本机制深入
高水位和Leader Epoch

← 副本机制深入 高水位和Leader Epoch→

最近更新
01
基础概念
10-31
02
Pytorch
10-30
03
Numpy
10-30
更多文章>
Theme by Vdoing | Copyright © 2021-2024 旭日 | 蜀ICP备2021000788号-1
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式