消费者重平衡问题
# 一、重平衡基础概念
概念
Rebalance 本质上是一种协议,规定了一个 Consumer Group 下的所有 Consumer 如何达成一致,来分配订阅 Topic 的每个分区。
触发条件
- 组成员数发生变更。比如有新的 Consumer 实例加入组或者离开组,抑或是有 Consumer 实例崩溃被“踢出”组。
- 订阅主题数发生变更。Consumer Group 可以使用正则表达式的方式订阅主题,比如
consumer.subscribe(Pattern.compile("t.*c"))
就表明该 Group 订阅所有以字母 t 开头、字母 c 结尾的主题。在 Consumer Group 的运行过程中,你新创建了一个满足这样条件的主题,那么该 Group 就会发生 Rebalance。 - 订阅主题的分区数发生变更。Kafka 当前只能允许增加一个主题的分区数。当分区数增加时,就会触发订阅该主题的所有 Group 开启 Rebalance。
重平衡的问题
在整个过程中,所有实例都不能消费任何消息,因此它对 Consumer 的 TPS 影响很大。
协调者概念
所谓协调者,在 Kafka 中对应的术语是 Coordinator,它专门为 Consumer Group 服务,负责为 Group 执行 Rebalance 以及提供位移管理和组成员管理等。
# 二、协调者工作流程
所有 Broker 在启动时,都会创建和开启相应的 Coordinator 组件。也就是说,所有 Broker 都有各自的 Coordinator 组件。Consumer Group 如何确定为它服务的 Coordinator 在哪台 Broker 上:
- 确定由位移主题的哪个分区来保存该 Group 数据:
partitionId=Math.abs(groupId.hashCode() % offsetsTopicPartitionCount)
。 - 找出该分区 Leader 副本所在的 Broker,该 Broker 即为对应的 Coordinator。
通过流程的知悉,当我们需要快速排查 Broker 端日志时,我们能够根据这个算法准确定位 Coordinator 对应的 Broker,不必一台 Broker 一台 Broker 地盲查。
# 三、Rebalance弊端
- Rebalance 影响 Consumer 端 TPS,在 Rebalance 期间,Consumer 会停下手头的事情,什么也干不了。
- Rebalance 很慢。如果你的 Group 下成员很多,就一定会有这样的痛点。
- Rebalance 效率不高,每次 Rebalance 时,Group 下的所有成员都要参与进来,而且通常不会考虑局部性原理,但局部性原理对提升系统性能是特别重要的。
针对第三点,现在假设有10个成员,每个成员负责5个分区,如果某一个成员退出之后,最好的情况是维持原来的分配,把退出成员的分区重新分配。但是Group 会打散这 50 个分区(10 个成员 * 5 个分区),由当前存活的 9 个成员重新分配它们。
# 四、避免Rebalance
# 1、Rebalance时机
Rebalance 发生的时机有三个:
- 组成员数量发生变化
- 订阅主题数量发生变化
- 订阅主题的分区数发生变化
后面两个通常都是运维的主动操作,所以它们引发的 Rebalance 大都是不可避免的。接下来,我们主要说说因为组成员数量变化而引发的 Rebalance 该如何避免。
针对组成员数量的变化,无非就是增加和减少,下面来深入讨论一下:
增加
当我们启动一个配置有相同 group.id 值的 Consumer 程序时,实际上就向这个 Group 添加了一个新的 Consumer 实例。此时,Coordinator 会接纳这个新实例,将其加入到组中,并重新分配分区。通常来说这种情况是计划之内的,并不需要去规避。
减少
如果我们单纯去关闭某些consumer实例,这种情况也是计划之内的,需要担心的是某些情况下,Consumer 实例会被 Coordinator 错误地认为“已停止”从而被“踢出”Group。如果是这个原因导致的 Rebalance,我们就不能不管了。
# 2、Consumer 与重平衡有关的参数
1、每个 Consumer 实例都会定期地向 Coordinator 发送心跳请求,表明它还存活着。如果某个 Consumer 实例不能及时地发送这些心跳请求,Coordinator 就会认为该 Consumer 已经“死”了,从而将其从 Group 中移除,然后开启新一轮 Rebalance。对应参数为session.timeout.ms
,就是被用来表征此事的。该参数的默认值是 10 秒
2、heartbeat.interval.ms
控制发送心跳请求频率的参数,频繁地发送心跳请求会额外消耗带宽资源,但好处是能够更加快速地知晓当前是否开启 Rebalance
3、max.poll.interval.ms
限定了 Consumer 端应用程序两次调用 poll 方法的最大时间间隔。它的默认值是 5 分钟,表示你的 Consumer 程序如果在 5 分钟之内无法消费完 poll 方法返回的消息,那么 Consumer 会主动发起“离开组”的请求,Coordinator 也会开启新一轮 Rebalance。
# 3、参数设置
第一类非必要 Rebalance 是因为未能及时发送心跳,导致 Consumer 被“踢出”Group 而引发的:
- 设置
session.timeout.ms
= 6s - 设置
heartbeat.interval.ms
= 2s - 要保证 Consumer 实例在被判定为“dead”之前,能够发送至少 3 轮的心跳请求,即
session.timeout.ms
>= 3 *heartbeat.interval.ms
第二类非必要 Rebalance 是 Consumer 消费时间过长导致的:
max.poll.interval.ms
设置的大小标准为要给业务处理逻辑留下充足的时间
总而言之,一定要避免因为各种参数或逻辑不合理而导致的组成员意外离组或退出的情形