生产者是如何管理TCP连接
# 一、生产者程序概览
我们通过生产者进行发送消息,大致代码流程如下:
- 构造生产者所需要的参数
- 利用构造的参数,创建一个
KafkaProducer
对象实例 - 调用 KafkaProducer 的
send
方法发送消息 - 调用 KafkaProducer 的
close
方法关闭生产者并释放各种系统资源
Properties props = new Properties ();
props.put(“参数1”, “参数1的值”);
props.put(“参数2”, “参数2的值”);
……
try (Producer<String, String> producer = new KafkaProducer<>(props)) {
producer.send(new ProducerRecord<String, String>(……), callback);
……
}
在kafka往指定主题发送消息的时候,必然会和broker建立TCP连接,那么生产者是如何管理这些连接的呢?
# 二、何时建立TCP连接
# 1、创建生产者实例
生产者构建中有一个重要参数:bootstrap.servers
,指定了这个 Producer 启动时要连接的 Broker 地址。
如果为这个参数指定了Borker数值,那么Producer 启动时会首先创建与这些指定的Borker进行tcp连接。
具体来说,在创建 KafkaProducer 实例时,生产者应用会在后台创建并启动一个名为 Sender 的线程,该 Sender 线程开始运行时首先会创建与 Broker 的连接。
# 2、更新元数据
Producer 更新元数据的两个场景如下:
- 当 Producer 尝试给一个不存在的主题发送消息时,Broker 会告诉 Producer 说这个主题不存在。此时 Producer 会发送 METADATA 请求给 Kafka 集群,去尝试获取最新的元数据信息。
- Producer 通过 metadata.max.age.ms 参数定期地去更新元数据信息。该参数的默认值是 300000,即 5 分钟,也就是说不管集群那边是否有变化,Producer 每 5 分钟都会强制刷新一次元数据以保证它是最及时的数据。
# 3、发送消息
当 Producer 更新了集群的元数据信息之后,如果发现与某些 Broker 当前没有连接,那么它就会创建一个 TCP 连接。同样地,当要发送消息时,Producer 发现尚不存在与目标 Broker 的连接,也会创建一个。
# 三、何时关闭TCP连接
Producer 端关闭 TCP 连接的方式有两种:
- 一种是用户主动关闭
- 一种是 Kafka 自动关闭
用户主动关闭
这里的主动关闭实际上是广义的主动关闭,甚至包括用户调用 kill -9 主动“杀掉”Producer 应用。当然最推荐的方式还是调用 producer.close()
方法来关闭。
Kafka自动关闭
Producer 端参数 connections.max.idle.ms
的值有关。默认情况下该参数值是 9 分钟,即如果在 9 分钟内没有任何请求“流过”某个 TCP 连接,那么 Kafka 会主动帮你把该 TCP 连接关闭。用户可以在 Producer 端设置 connections.max.idle.ms
= -1 禁掉这种机制。一旦被设置成 -1,TCP 连接将成为永久长连接。