消费组消费进度如何监控
# 一、消费监控意义
对于Kakfa消费者来说,最重要的事情就是监控他的消费进度,或者说是监控它们消费的滞后程度。这个滞后程度有个专门的名称:消费者 Lag 或 Consumer Lag,而所谓滞后程度就是指消费者当前落后生产者的速度。
比如生产者生产了100W的数据,消费者消费了80W的数据,那么消费者滞后了 20 万条消息,即 Lag 等于 20 万。
但是通常来说,lag这个指标维度是在分区上的,有时候我们需要统计主题级别,就需要把某一个主题下所有分区的lag进行相加。
一个正常工作的消费者,它的 Lag 值应该很小,甚至是接近于 0 的,这表示该消费者能够及时地消费生产者生产出来的消息,滞后程度很小。反之,如果一个消费者 Lag 值很大,通常就表明它无法跟上生产者的速度,最终 Lag 会越来越大,从而拖慢下游消息的处理速度。
# 二、消费监控方式
# 1、Kafka 自带命令
kafka-consumer-groups
脚本是 Kafka 为我们提供的最直接的监控消费者消费进度的工具。
$ bin/kafka-consumer-groups.sh --bootstrap-server <Kafka broker连接信息> --describe --group <group名称>
Kafka 连接信息就是 < 主机名:端口 > 对,而 group 名称就是你的消费者程序中设置的 group.id 值
LOG-END-OFFSET
:每个分区当前最新生产的消息的位移值CURRENT-OFFSET
:该消费者组当前最新消费消息的位移值LAG 值
:前两者的差值,也就是消费滞后的数量
# 2、Kafka Java Consumer API
如何利用 Consumer 端 API 监控给定消费者组的 Lag 值:
public static Map<TopicPartition, Long> lagOf(String groupID, String bootstrapServers) throws TimeoutException {
Properties props = new Properties();
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
try (AdminClient client = AdminClient.create(props)) {
ListConsumerGroupOffsetsResult result = client.listConsumerGroupOffsets(groupID);
try {
Map<TopicPartition, OffsetAndMetadata> consumedOffsets = result.partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 禁止自动提交位移
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupID);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(consumedOffsets.keySet());
return endOffsets.entrySet().stream().collect(Collectors.toMap(entry -> entry.getKey(),
entry -> entry.getValue() - consumedOffsets.get(entry.getKey()).offset()));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// 处理中断异常
// ...
return Collections.emptyMap();
} catch (ExecutionException e) {
// 处理ExecutionException
// ...
return Collections.emptyMap();
} catch (TimeoutException e) {
throw new TimeoutException("Timed out when getting lag for consumer group " + groupID);
}
}
}
这段代码只适用于 Kafka 2.0.0 及以上的版本
# 3、Kafka JMX 监控指标
在很多监控场景中,我们往往是有借助现有的框架,比如Grafana,这时候前两种方式就不太适用了。
当前,Kafka 消费者提供了一个名为 kafka.consumer:type=consumer-fetch-manager-metrics,client-id=“{client-id}”的 JMX 指标:
records-lag-max
:消费者在测试窗口时间内曾经达到的最大的 Lag 值records-lead-min
:消费者在测试窗口时间内曾经达到的最小 Lead 值
这里的 Lead 值是指消费者最新消费消息的位移与分区当前第一条消息位移的差值。很显然,Lag 和 Lead 是一体的两个方面:Lag 越大的话,Lead 就越小,反之也是同理。
双监控
- lag的增大,意味着这个阶段,消费者的速度是赶不上生产者的速度,但是如果只是一段时间还是能接受的。
- Lead的减少,意味着消费者最新消费消息马上就是分区第一条消息,如果是快接近于 0 了,就一定要小心了,这可能预示着消费者端要丢消息了
我们知道 Kafka 的消息是有留存时间设置的,默认是 1 周,也就是说 Kafka 默认删除 1 周前的数据。倘若你的消费者程序足够慢,慢到它要消费的数据快被 Kafka 删除了,这时你就必须立即处理,否则一定会出现消息被删除,从而导致消费者程序重新调整位移值的情形。
Lag 值从 100 万增加到 200 万这件事情,远不如 Lead 值从 200 减少到 100 这件事来得重要。在实际生产环境中,请你一定要同时监控 Lag 值和 Lead 值