如何确保消息不丢失
# 一、消息与位移提交
首先明确一点,Kafka 只对已提交
的消息(committed message)做有限度的持久化保证
。
针对
已提交
的消息是指:当 Kafka 的若干个 Broker 成功地接收到一条消息并写入到日志文件后,它们会告诉生产者程序这条消息已成功提交。此时,这条消息在 Kafka 看来就正式变为“已提交”消息了。针对
有限度的持久化保证
是指:Kafka 不可能保证在任何情况下都做到不丢失消息,假如消息保存在 N 个 Kafka Broker 上,那么这个前提条件就是这 N 个 Broker 中至少有 1 个存活。只要这个条件成立,Kafka 就能保证你的这条消息永远不会丢失。
# 二、消息传输的三个阶段
消息传输主要分为三个阶段:
生产阶段:在这个阶段,从消息在 Producer 创建出来,经过网络传输发送到 Broker 端。
存储阶段:在这个阶段,消息在 Broker 端存储,如果是集群,消息会在这个阶段被复制到其他的副本上。
消费阶段:在这个阶段,Consumer 从 Broker 上拉取消息,经过网络传输发送到 Consumer 上。
针对每个阶段,我们需要正确使用,才能保证消息不丢失。
# 1、生产阶段
这个阶段一般是我们调用api把消息发送给broker,broker接受消息之后给客户端一个响应,这样就完成了一次正常消息的发送。
而针对网络等问题,有些消息队列在长时间没收到发送确认响应后,会自动重试,如果重试再失败,就会以返回值或者异常的方式告知用户。所以我们只需要正确处理返回值或异常,就可以保证消息不丢失。
同步发送
try {
RecordMetadata metadata = producer.send(record).get();
System.out.println("消息发送成功。");
} catch (Throwable e) {
System.out.println("消息发送失败!");
System.out.println(e);
}
异步发送
producer.send(record, (metadata, exception) -> {
if (metadata != null) {
System.out.println("消息发送成功。");
} else {
System.out.println("消息发送失败!");
System.out.println(exception);
}
});
# 2、存储阶段
正常情况下,broker工作正常是不会出现消息丢失的情况,但是如果 broker 出现了故障,比如进程死掉了或者服务器宕机了,还是可能会丢失消息的。
- 消息确认机制:ack设置为all或者-1,来保证消息写入到所有副本后,对客户端进行应答。
- 多副本机制:集群的副本尽量设置超过2,通过数据的冗余性来保证消息不丢失
# 3、消费阶段
消费阶段会因为位移提交的问题出现消息丢失
问题,主要有两个关键点:
- 关闭自动提交,通过代码的方式进行手动提交
- 不要在收到消息后就立即发送消费确认,而是应该在执行完所有消费业务逻辑之后,再发送消费确认。
# 4、三个阶段使用总结
- 在生产阶段,你需要捕获消息发送的错误,并重发消息。
- 在存储阶段,你可以通过配置刷盘和复制相关的参数,让消息写入到多个副本的磁盘上,来确保消息不会因为某个 Broker 宕机或者磁盘损坏而丢失。
- 在消费阶段,你需要在处理完全部消费业务逻辑之后,再发送消费确认。
# 三、如何校验消息丢失
消费消息最尴尬的不是消息丢失了,而是不知道消息丢失了,抛开完善的分布式链路追踪系统,我们可以使用消息队列的有序性来验证是否有消息丢失。
原理
在 Producer 端,我们给每个发出的消息附加一个连续递增的序号,然后在 Consumer 端来检查这个序号的连续性。如果没有消息丢失,Consumer 收到消息的序号必然是连续递增的,或者说收到的消息,其中的序号必然是上一条消息的序号 +1。
如果检测到序号不连续,那就是丢消息了。还可以通过缺失的序号来确定丢失的是哪条消息,方便进一步排查原因。
方案
kafka这类分布式消息队列是支持拦截器功能的,我们在发送消息之前利用拦截器将序号注入到消息中,在 Consumer 收到消息的拦截器中检测序号的连续性,这样实现的好处是消息检测的代码不会侵入到你的业务代码中。
注意
- 由于kafka这种消息队列是无法保证主题层级的严格顺序,只能保证分区上消息的顺序性,所以我们在发消息的时候必须要指定分区,并且,在每个分区单独检测消息序号的连续性。
- 如果生产者有多个实例,由于生产者速率不同,就无法保证消息的顺序性,所以这种情况应该以生产者来进行划分,每个 Producer 分别生成各自的消息序号,并且需要附加上 Producer 的标识,在 Consumer 端按照每个 Producer 分别来检测序号的连续性。
# 四、无消息丢失最佳配置
进行上面的分析,我们来总结一个无消息丢失最佳配置:
生产阶段
1、发送消息要进行相关异常的处理,同步发送使用get并捕获异常,异步发送是要使用回调方法
2、设置 acks = all(-1)。acks 是 Producer 的一个参数,代表了你对“已提交”消息的定义。如果设置成 all,则表明所有副本 Broker 都要接收到消息,该消息才算是“已提交”。
3、设置 retries 为一个较大的值。这里的 retries 同样是 Producer 的参数,当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了 retries > 0 的 Producer 能够自动重试消息发送,避免消息丢失。
存储阶段
1、设置 unclean.leader.election.enable = false。这是 Broker 端的参数,它控制的是哪些 Broker 有资格竞选分区的 Leader。如果一个 Broker 落后原先的 Leader 太多,那么它一旦成为新的 Leader,必然会造成消息的丢失。故一般都要将该参数设置成 false,
2、设置 replication.factor >= 3。这也是 Broker 端的参数。最好将消息多保存几份,毕竟目前防止消息丢失的主要机制就是冗余。
3、设置 min.insync.replicas > 1。这依然是 Broker 端参数,控制的是消息至少要被写入到多少个副本才算是“已提交”。设置成大于 1 可以提升消息持久性。在实际环境中千万不要使用默认值 1。
消费阶段
1、Consumer 端有个参数 enable.auto.commit,最好把它设置成 false,并采用手动提交位移的方式。
2、在消费阶段,你需要在处理完全部消费业务逻辑之后,再发送消费确认。