位移提交问题
# 一、Kafka的位移
在kafka中位移存在两个地方:
- 消息在分区中的位移
- 消费者的消费位移
其中,消费位移是Consumer要消费的下一条位移,是下一条消息的位移,而不是目前最新消费消息的位移。
比如现在一个分区中有 10 条消息,位移分别是 0 到 9。某个 Consumer 应用已消费了 5 条消息,这就说明该 Consumer 消费了位移为 0 到 4 的 5 条消息,此时 Consumer 的位移是 5,指向了下一条消息的位移。
# 二、消费位移
概念深入
Consumer需要向kafka汇报自己的位移数据,这个过程就被称为位移提交。由于Consumer是可以消费多个分区上的消息,所以位移的提交实际上是在分区粒度上进行的,即 Consumer 需要为分配给它的每个分区提交各自的位移数据。
恢复机制
当我们的Kafka崩溃的时候,会发生重平衡,被分配到此分区的Consumer读取上一个Consumer在此分区的消费位移,然后继续消费,从而避免了重复消费问题。
位移提交的语义保障
由于位移提交特别灵活,开发者完全可以提交任何位移,但是产生的结果也需要开发者承担。也就是说位移提交的语义保障是由开发者来负责的,Kafka 只会“无脑”地接受你提交的位移
# 三、自动提交和手动提交(开发者)
# 1、自动提交
Kafka Consumer 在后台默默地为你提交位移,作为用户的你完全不必操心这些事。
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "2000");
# 2、手动提交
开发者自己提交位移,Kafka Consumer 压根不管。
开启手动提交位移的方法就是设置 enable.auto.commit 为 false,到这里只是设置了不自动提交,还需要提交相应消费者的位移提交。
# 四、同步提交和异步提交(Consumer)
# 1、同步提交
commitSync()
该方法会提交 KafkaConsumer#poll()
返回的最新位移,它是一个同步操作,该方法会一直等待,直到位移成功提交才返回。
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(1));
process(records); // 处理消息
try {
consumer.commitSync();
} catch (CommitFailedException e) {
handle(e); // 处理提交失败异常
}
}
但是在调用 commitSync() 时,Consumer 程序会处于阻塞状态,直到远端的 Broker 返回提交结果,这个状态才会结束。
# 2、异步提交
鉴于这个问题,Kafka 社区为手动提交位移提供了另一个 API 方法:KafkaConsumer#commitAsync()
。
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(1));
process(records); // 处理消息
consumer.commitAsync((offsets, exception) -> {
if (exception != null)
handle(exception);
});
}
commitAsync 的问题在于,出现问题时它不会自动重试。因为它是异步操作,倘若提交失败后自动重试,那么它重试时提交的位移值可能早已经“过期”或不是最新值了。因此,异步提交的重试其实没有意义,所以 commitAsync 是不会重试的。
# 3、两者结合
我们需要将 commitSync 和 commitAsync 组合使用才能达到最理想的效果,原因有两个:
- 我们可以利用 commitSync 的自动重试来规避那些瞬时错误,比如网络的瞬时抖动,Broker 端 GC 等。因为这些问题都是短暂的,自动重试通常都会成功,因此,我们不想自己重试,而是希望 Kafka Consumer 帮我们做这件事。
- 我们不希望程序总处于阻塞状态,影响 TPS。
try {
while(true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(1));
process(records); // 处理消息
commitAysnc(); // 使用异步提交规避阻塞
}
} catch(Exception e) {
handle(e); // 处理异常
} finally {
try {
consumer.commitSync(); // 最后一次提交使用同步阻塞式提交
} finally {
consumer.close();
}
}
对于常规性、阶段性的手动提交,我们调用 commitAsync() 避免程序阻塞,而在 Consumer 要关闭前,我们调用 commitSync() 方法执行同步阻塞式的位移提交,以确保 Consumer 关闭前能够保存正确的位移数据。
# 五、分批提交
比如 poll 方法一次返回了 500 条消息,当你处理完这 500 条消息之后,前面我们提到的各种方法会一次性地将这 500 条消息的位移一并处理。简单来说,就是直接提交最新一条消息的位移。
但是如果我们想要执行完100条消息就进行一次位移提交,通俗来说就是把大事务转换成小事务。
private Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
int count = 0;
……
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record: records) {
process(record); // 处理消息
offsets.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1);
if(count % 100 == 0)
consumer.commitAsync(offsets, null); // 回调处理逻辑是null
count++;
}
}
# 六、CommitFailedException
所谓 CommitFailedException
,顾名思义就是 Consumer 客户端在提交位移时出现了错误或异常,而且还是那种不可恢复的严重异常。
源码注解中对此进行解释:是消费者组已经开启了 Rebalance 过程,并且将要提交位移的分区分配给了另一个消费者实例。出现这个情况的原因是,你的消费者实例连续两次调用 poll 方法的时间间隔超过了期望的 max.poll.interval.ms 参数值。总结来说:消费者实例花费了太长的时间进行消息处理,耽误了调用 poll 方法。
社区给出的解决办法是:
- 增加期望的时间间隔
max.poll.interval.ms
参数值。 - 减少 poll 方法一次性返回的消息数量,即减少
max.poll.records
参数值。
# 1、异常场景一
如果当消息处理的总时间超过预设的max.poll.interval.ms
参数值时,Kafka Consumer 端会抛出 CommitFailedException
异常。也就是说消费者还没处理消息的时候,又进行了消息的拉取,就会出现这个异常。对应的解决方案如下:
- 缩短单条消息处理的时间
- 增加 Consumer 端允许下游系统消费一批消息的最大时长:增加
max.poll.interval.ms
- 减少下游系统一次性消费的消息总数:减少
max.poll.records
参数值。 - 下游使用多线程来消费
# 2、异常场景二
如果你的应用中同时出现了设置相同 group.id 值的消费者组程序和独立消费者程序,那么当独立消费者程序手动提交位移时,Kafka 就会立即抛出 CommitFailedException 异常。