学习总结录 学习总结录
首页
归档
分类
标签
  • Java基础
  • Java集合
  • MySQL
  • Redis
  • JVM
  • 多线程
  • 计算机网络
  • 操作系统
  • Spring
  • Kafka
  • Elasticsearch
  • Python
  • 面试专题
  • 案例实践
  • 工具使用
  • 项目搭建
  • 服务治理
  • ORM框架
  • 分布式组件
  • MiniSpring
  • 设计模式
  • 算法思想
  • 编码规范
友链
关于
GitHub (opens new window)
首页
归档
分类
标签
  • Java基础
  • Java集合
  • MySQL
  • Redis
  • JVM
  • 多线程
  • 计算机网络
  • 操作系统
  • Spring
  • Kafka
  • Elasticsearch
  • Python
  • 面试专题
  • 案例实践
  • 工具使用
  • 项目搭建
  • 服务治理
  • ORM框架
  • 分布式组件
  • MiniSpring
  • 设计模式
  • 算法思想
  • 编码规范
友链
关于
GitHub (opens new window)
  • Java基础

  • Java集合

  • MySQL

  • Redis

  • JVM

  • 多线程

  • 计算机网络

  • Spring

  • Kafka

    • 生产者客户端开发
    • 消费者客户端开发
    • 主题与分区管理
    • 配置管理
    • KafkaAdminClient
    • 消费管理
    • Kafka Streams
    • 日志存储
    • 可靠性研究
    • 深入服务端
    • 深入客户端
    • 集群参数配置
    • 生产者消息分区机制原理
    • 如何确保消息不丢失
    • 如何确保消息不重复消费
    • 消费积压如何处理
    • 生产者是如何管理TCP连接
    • 消费者重平衡问题
    • 位移提交问题
      • 一、Kafka的位移
      • 二、消费位移
      • 三、自动提交和手动提交(开发者)
        • 1、自动提交
        • 2、手动提交
      • 四、同步提交和异步提交(Consumer)
        • 1、同步提交
        • 2、异步提交
        • 3、两者结合
      • 五、分批提交
      • 六、CommitFailedException
        • 1、异常场景一
        • 2、异常场景二
      • 参考
    • 消费者是如何管理TCP连接
    • 副本机制深入
    • 消费组消费进度如何监控
    • 高水位和Leader Epoch
  • Elasticsearch

  • Python

  • 面试专题

  • 知识库
  • Kafka
旭日
2023-10-11
目录

位移提交问题

# 一、Kafka的位移

在kafka中位移存在两个地方:

  • 消息在分区中的位移
  • 消费者的消费位移

其中,消费位移是Consumer要消费的下一条位移,是下一条消息的位移,而不是目前最新消费消息的位移。

比如现在一个分区中有 10 条消息,位移分别是 0 到 9。某个 Consumer 应用已消费了 5 条消息,这就说明该 Consumer 消费了位移为 0 到 4 的 5 条消息,此时 Consumer 的位移是 5,指向了下一条消息的位移。

# 二、消费位移

概念深入

Consumer需要向kafka汇报自己的位移数据,这个过程就被称为位移提交。由于Consumer是可以消费多个分区上的消息,所以位移的提交实际上是在分区粒度上进行的,即 Consumer 需要为分配给它的每个分区提交各自的位移数据。

恢复机制

当我们的Kafka崩溃的时候,会发生重平衡,被分配到此分区的Consumer读取上一个Consumer在此分区的消费位移,然后继续消费,从而避免了重复消费问题。

位移提交的语义保障

由于位移提交特别灵活,开发者完全可以提交任何位移,但是产生的结果也需要开发者承担。也就是说位移提交的语义保障是由开发者来负责的,Kafka 只会“无脑”地接受你提交的位移

# 三、自动提交和手动提交(开发者)

# 1、自动提交

Kafka Consumer 在后台默默地为你提交位移,作为用户的你完全不必操心这些事。

Properties props = new Properties(); 
props.put("bootstrap.servers", "localhost:9092"); 
props.put("group.id", "test"); 
props.put("enable.auto.commit", "true"); 
props.put("auto.commit.interval.ms", "2000");

# 2、手动提交

开发者自己提交位移,Kafka Consumer 压根不管。

开启手动提交位移的方法就是设置 enable.auto.commit 为 false,到这里只是设置了不自动提交,还需要提交相应消费者的位移提交。

# 四、同步提交和异步提交(Consumer)

# 1、同步提交

commitSync()该方法会提交 KafkaConsumer#poll() 返回的最新位移,它是一个同步操作,该方法会一直等待,直到位移成功提交才返回。

while (true) {
            ConsumerRecords<String, String> records =
                        consumer.poll(Duration.ofSeconds(1));
            process(records); // 处理消息
            try {
                        consumer.commitSync();
            } catch (CommitFailedException e) {
                        handle(e); // 处理提交失败异常
            }
}

但是在调用 commitSync() 时,Consumer 程序会处于阻塞状态,直到远端的 Broker 返回提交结果,这个状态才会结束。

# 2、异步提交

鉴于这个问题,Kafka 社区为手动提交位移提供了另一个 API 方法:KafkaConsumer#commitAsync()。

while (true) {
            ConsumerRecords<String, String> records = 
  consumer.poll(Duration.ofSeconds(1));
            process(records); // 处理消息
            consumer.commitAsync((offsets, exception) -> {
  if (exception != null)
  handle(exception);
  });
}

commitAsync 的问题在于,出现问题时它不会自动重试。因为它是异步操作,倘若提交失败后自动重试,那么它重试时提交的位移值可能早已经“过期”或不是最新值了。因此,异步提交的重试其实没有意义,所以 commitAsync 是不会重试的。

# 3、两者结合

我们需要将 commitSync 和 commitAsync 组合使用才能达到最理想的效果,原因有两个:

  • 我们可以利用 commitSync 的自动重试来规避那些瞬时错误,比如网络的瞬时抖动,Broker 端 GC 等。因为这些问题都是短暂的,自动重试通常都会成功,因此,我们不想自己重试,而是希望 Kafka Consumer 帮我们做这件事。
  • 我们不希望程序总处于阻塞状态,影响 TPS。
try {
    while(true) {
        ConsumerRecords<String, String> records = 
            consumer.poll(Duration.ofSeconds(1));
        process(records); // 处理消息
        commitAysnc(); // 使用异步提交规避阻塞
    }
} catch(Exception e) {
    handle(e); // 处理异常
} finally {
    try {
         consumer.commitSync(); // 最后一次提交使用同步阻塞式提交
    } finally {
         consumer.close();
    }
}

对于常规性、阶段性的手动提交,我们调用 commitAsync() 避免程序阻塞,而在 Consumer 要关闭前,我们调用 commitSync() 方法执行同步阻塞式的位移提交,以确保 Consumer 关闭前能够保存正确的位移数据。

# 五、分批提交

比如 poll 方法一次返回了 500 条消息,当你处理完这 500 条消息之后,前面我们提到的各种方法会一次性地将这 500 条消息的位移一并处理。简单来说,就是直接提交最新一条消息的位移。

但是如果我们想要执行完100条消息就进行一次位移提交,通俗来说就是把大事务转换成小事务。

private Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
int count = 0;
……
while (true) {
            ConsumerRecords<String, String> records = 
  consumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, String> record: records) {
                        process(record);  // 处理消息
                        offsets.put(new TopicPartition(record.topic(), record.partition()),
                                   new OffsetAndMetadata(record.offset() + 1);
                       if(count % 100 == 0)
                                    consumer.commitAsync(offsets, null); // 回调处理逻辑是null
                        count++;
  }
}

# 六、CommitFailedException

所谓 CommitFailedException,顾名思义就是 Consumer 客户端在提交位移时出现了错误或异常,而且还是那种不可恢复的严重异常。

源码注解中对此进行解释:是消费者组已经开启了 Rebalance 过程,并且将要提交位移的分区分配给了另一个消费者实例。出现这个情况的原因是,你的消费者实例连续两次调用 poll 方法的时间间隔超过了期望的 max.poll.interval.ms 参数值。总结来说:消费者实例花费了太长的时间进行消息处理,耽误了调用 poll 方法。

社区给出的解决办法是:

  • 增加期望的时间间隔 max.poll.interval.ms参数值。
  • 减少 poll 方法一次性返回的消息数量,即减少 max.poll.records 参数值。

# 1、异常场景一

如果当消息处理的总时间超过预设的max.poll.interval.ms参数值时,Kafka Consumer 端会抛出 CommitFailedException 异常。也就是说消费者还没处理消息的时候,又进行了消息的拉取,就会出现这个异常。对应的解决方案如下:

  • 缩短单条消息处理的时间
  • 增加 Consumer 端允许下游系统消费一批消息的最大时长:增加max.poll.interval.ms
  • 减少下游系统一次性消费的消息总数:减少 max.poll.records 参数值。
  • 下游使用多线程来消费

# 2、异常场景二

如果你的应用中同时出现了设置相同 group.id 值的消费者组程序和独立消费者程序,那么当独立消费者程序手动提交位移时,Kafka 就会立即抛出 CommitFailedException 异常。

# 参考

官方文档 (opens new window)

图解Kafka之实战指南 (opens new window)

Kafka 核心技术与实战 (opens new window)

#消息队列
上次更新: 2024/06/29, 15:13:44
消费者重平衡问题
消费者是如何管理TCP连接

← 消费者重平衡问题 消费者是如何管理TCP连接→

最近更新
01
基础概念
10-31
02
Pytorch
10-30
03
Numpy
10-30
更多文章>
Theme by Vdoing | Copyright © 2021-2024 旭日 | 蜀ICP备2021000788号-1
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式