学习总结录 学习总结录
首页
归档
分类
标签
  • Java基础
  • Java集合
  • MySQL
  • Redis
  • JVM
  • 多线程
  • 计算机网络
  • 操作系统
  • Spring
  • Kafka
  • Elasticsearch
  • Python
  • 面试专题
  • 案例实践
  • 工具使用
  • 项目搭建
  • 服务治理
  • ORM框架
  • 分布式组件
  • MiniSpring
  • 设计模式
  • 算法思想
  • 编码规范
友链
关于
GitHub (opens new window)
首页
归档
分类
标签
  • Java基础
  • Java集合
  • MySQL
  • Redis
  • JVM
  • 多线程
  • 计算机网络
  • 操作系统
  • Spring
  • Kafka
  • Elasticsearch
  • Python
  • 面试专题
  • 案例实践
  • 工具使用
  • 项目搭建
  • 服务治理
  • ORM框架
  • 分布式组件
  • MiniSpring
  • 设计模式
  • 算法思想
  • 编码规范
友链
关于
GitHub (opens new window)
  • Java基础

  • Java集合

  • MySQL

  • Redis

  • JVM

  • 多线程

  • 计算机网络

  • Spring

  • Kafka

    • 生产者客户端开发
    • 消费者客户端开发
    • 主题与分区管理
    • 配置管理
    • KafkaAdminClient
    • 消费管理
    • Kafka Streams
    • 日志存储
    • 可靠性研究
    • 深入服务端
    • 深入客户端
    • 集群参数配置
    • 生产者消息分区机制原理
    • 如何确保消息不丢失
      • 一、消息与位移提交
      • 二、消息传输的三个阶段
        • 1、生产阶段
        • 2、存储阶段
        • 3、消费阶段
        • 4、三个阶段使用总结
      • 三、如何校验消息丢失
      • 四、无消息丢失最佳配置
      • 参考
    • 如何确保消息不重复消费
    • 消费积压如何处理
    • 生产者是如何管理TCP连接
    • 消费者重平衡问题
    • 位移提交问题
    • 消费者是如何管理TCP连接
    • 副本机制深入
    • 消费组消费进度如何监控
    • 高水位和Leader Epoch
  • Elasticsearch

  • Python

  • 面试专题

  • 知识库
  • Kafka
旭日
2023-09-20
目录

如何确保消息不丢失

# 一、消息与位移提交

首先明确一点,Kafka 只对已提交的消息(committed message)做有限度的持久化保证。

  • 针对已提交的消息是指:当 Kafka 的若干个 Broker 成功地接收到一条消息并写入到日志文件后,它们会告诉生产者程序这条消息已成功提交。此时,这条消息在 Kafka 看来就正式变为“已提交”消息了。

  • 针对有限度的持久化保证是指:Kafka 不可能保证在任何情况下都做到不丢失消息,假如消息保存在 N 个 Kafka Broker 上,那么这个前提条件就是这 N 个 Broker 中至少有 1 个存活。只要这个条件成立,Kafka 就能保证你的这条消息永远不会丢失。

# 二、消息传输的三个阶段

消息传输主要分为三个阶段:

  • 生产阶段:在这个阶段,从消息在 Producer 创建出来,经过网络传输发送到 Broker 端。

  • 存储阶段:在这个阶段,消息在 Broker 端存储,如果是集群,消息会在这个阶段被复制到其他的副本上。

  • 消费阶段:在这个阶段,Consumer 从 Broker 上拉取消息,经过网络传输发送到 Consumer 上。

image-20230920203104345

针对每个阶段,我们需要正确使用,才能保证消息不丢失。

# 1、生产阶段

这个阶段一般是我们调用api把消息发送给broker,broker接受消息之后给客户端一个响应,这样就完成了一次正常消息的发送。

而针对网络等问题,有些消息队列在长时间没收到发送确认响应后,会自动重试,如果重试再失败,就会以返回值或者异常的方式告知用户。所以我们只需要正确处理返回值或异常,就可以保证消息不丢失。

同步发送

try {
    RecordMetadata metadata = producer.send(record).get();
    System.out.println("消息发送成功。");
} catch (Throwable e) {
    System.out.println("消息发送失败!");
    System.out.println(e);
}

异步发送

producer.send(record, (metadata, exception) -> {
    if (metadata != null) {
        System.out.println("消息发送成功。");
    } else {
        System.out.println("消息发送失败!");
        System.out.println(exception);
    }
});

# 2、存储阶段

正常情况下,broker工作正常是不会出现消息丢失的情况,但是如果 broker 出现了故障,比如进程死掉了或者服务器宕机了,还是可能会丢失消息的。

  • 消息确认机制:ack设置为all或者-1,来保证消息写入到所有副本后,对客户端进行应答。
  • 多副本机制:集群的副本尽量设置超过2,通过数据的冗余性来保证消息不丢失

# 3、消费阶段

消费阶段会因为位移提交的问题出现消息丢失问题,主要有两个关键点:

  • 关闭自动提交,通过代码的方式进行手动提交
  • 不要在收到消息后就立即发送消费确认,而是应该在执行完所有消费业务逻辑之后,再发送消费确认。

# 4、三个阶段使用总结

  • 在生产阶段,你需要捕获消息发送的错误,并重发消息。
  • 在存储阶段,你可以通过配置刷盘和复制相关的参数,让消息写入到多个副本的磁盘上,来确保消息不会因为某个 Broker 宕机或者磁盘损坏而丢失。
  • 在消费阶段,你需要在处理完全部消费业务逻辑之后,再发送消费确认。

# 三、如何校验消息丢失

消费消息最尴尬的不是消息丢失了,而是不知道消息丢失了,抛开完善的分布式链路追踪系统,我们可以使用消息队列的有序性来验证是否有消息丢失。

原理

在 Producer 端,我们给每个发出的消息附加一个连续递增的序号,然后在 Consumer 端来检查这个序号的连续性。如果没有消息丢失,Consumer 收到消息的序号必然是连续递增的,或者说收到的消息,其中的序号必然是上一条消息的序号 +1。

如果检测到序号不连续,那就是丢消息了。还可以通过缺失的序号来确定丢失的是哪条消息,方便进一步排查原因。

方案

kafka这类分布式消息队列是支持拦截器功能的,我们在发送消息之前利用拦截器将序号注入到消息中,在 Consumer 收到消息的拦截器中检测序号的连续性,这样实现的好处是消息检测的代码不会侵入到你的业务代码中。

注意

  • 由于kafka这种消息队列是无法保证主题层级的严格顺序,只能保证分区上消息的顺序性,所以我们在发消息的时候必须要指定分区,并且,在每个分区单独检测消息序号的连续性。
  • 如果生产者有多个实例,由于生产者速率不同,就无法保证消息的顺序性,所以这种情况应该以生产者来进行划分,每个 Producer 分别生成各自的消息序号,并且需要附加上 Producer 的标识,在 Consumer 端按照每个 Producer 分别来检测序号的连续性。

# 四、无消息丢失最佳配置

进行上面的分析,我们来总结一个无消息丢失最佳配置:

生产阶段

1、发送消息要进行相关异常的处理,同步发送使用get并捕获异常,异步发送是要使用回调方法

2、设置 acks = all(-1)。acks 是 Producer 的一个参数,代表了你对“已提交”消息的定义。如果设置成 all,则表明所有副本 Broker 都要接收到消息,该消息才算是“已提交”。

3、设置 retries 为一个较大的值。这里的 retries 同样是 Producer 的参数,当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了 retries > 0 的 Producer 能够自动重试消息发送,避免消息丢失。

存储阶段

1、设置 unclean.leader.election.enable = false。这是 Broker 端的参数,它控制的是哪些 Broker 有资格竞选分区的 Leader。如果一个 Broker 落后原先的 Leader 太多,那么它一旦成为新的 Leader,必然会造成消息的丢失。故一般都要将该参数设置成 false,

2、设置 replication.factor >= 3。这也是 Broker 端的参数。最好将消息多保存几份,毕竟目前防止消息丢失的主要机制就是冗余。

3、设置 min.insync.replicas > 1。这依然是 Broker 端参数,控制的是消息至少要被写入到多少个副本才算是“已提交”。设置成大于 1 可以提升消息持久性。在实际环境中千万不要使用默认值 1。

消费阶段

1、Consumer 端有个参数 enable.auto.commit,最好把它设置成 false,并采用手动提交位移的方式。

2、在消费阶段,你需要在处理完全部消费业务逻辑之后,再发送消费确认。

# 参考

官方文档 (opens new window)

图解Kafka之实战指南 (opens new window)

Kafka 核心技术与实战 (opens new window)

#消息队列
上次更新: 2024/06/29, 15:13:44
生产者消息分区机制原理
如何确保消息不重复消费

← 生产者消息分区机制原理 如何确保消息不重复消费→

最近更新
01
基础概念
10-31
02
Pytorch
10-30
03
Numpy
10-30
更多文章>
Theme by Vdoing | Copyright © 2021-2024 旭日 | 蜀ICP备2021000788号-1
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式