消费者是如何管理TCP连接
# 一、消费者程序概览
我们通过消费者进行拉取消息,大致代码流程如下:
- 配置消费者客户端参数
- 创建相应的消费者实例
- 订阅主题。
- 拉取消息并消费
- 提交消费位移
- 关闭消费者实例
// 配置消费者参数
Properties props = new Properties();
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("bootstrap.servers", BROKER_LIST);
props.put("group.id", GROUP_ID);
props.put("client.id", "consumer.client.id.demo");
//创建一个消费者客户端实例
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
//订阅主题
consumer.subscribe(Collections.singletonList(TOPIC));
try {
while (isRunning.get()) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record :
records.records(topic)) {
System.out.println(record.topic() + " : " + record.value());
}
}
}finally {
consumer.close();
}
# 二、何时建立TCP连接
生产者是在创建
KafkaProducer
的时候创建了一个sender线程去获取集群信息消费者则是在调用
KafkaConsumer.poll
方法时去创建的TCP连接的,准确来说poll
过程中有三个场景能够建立TCP连接
# 1、发起 FindCoordinator 请求时
消费端中有一个协调者
的角色,它驻留在 Broker 端的内存中,负责消费者组的组成员管理和各个消费者的位移提交管理。
当消费者程序首次启动调用 poll 方法时,它需要向 Kafka 集群发送一个名为 FindCoordinator
的请求,希望 Kafka 集群告诉它哪个 Broker 是管理它的协调者。
# 2、连接协调者时
消费者知晓了真正的协调者后,会创建连向该 Broker 的 Socket 连接。只有成功连入协调者,协调者才能开启正常的组协调操作
# 3、消费数据时
消费者会为每个要消费的分区创建与该分区领导者副本所在 Broker 连接的 TCP。
# 三、创建多少个TCP连接
消费者程序会创建 3 类 TCP 连接:
- 确定协调者和获取集群元数据。
- 连接协调者,令其执行组成员管理操作
- 执行实际的消息获取
# 四、何时关闭连接
消费者关闭 Socket 也分为主动关闭和 Kafka 自动关闭
- 主动关闭:手动调用
KafkaConsumer.close()
方法,或者是执行 Kill 命令 - 自动关闭:Kafka 自动关闭是由消费者端参数
connection.max.idle.ms
控制的,该参数现在的默认值是 9 分钟
需要注意的是:一般我们在消费者程序中使用的循环的方式来调用poll方法获取消息,这相当于让这些 Socket 连接上总是能保证有请求在发送,从而也就实现了“长连接”的效果。
当执行实际的消息获取
这种请求创建之后,消费者程序会废弃掉第一类TCP连接(确定协调者和获取集群元数据。)
# 参考
上次更新: 2024/06/29, 15:13:44