学习总结录 学习总结录
首页
归档
分类
标签
  • Java基础
  • Java集合
  • MySQL
  • Redis
  • JVM
  • 多线程
  • 计算机网络
  • 操作系统
  • Spring
  • Kafka
  • Elasticsearch
  • Python
  • 面试专题
  • 案例实践
  • 工具使用
  • 项目搭建
  • 服务治理
  • ORM框架
  • 分布式组件
  • MiniSpring
  • 设计模式
  • 算法思想
  • 编码规范
友链
关于
GitHub (opens new window)
首页
归档
分类
标签
  • Java基础
  • Java集合
  • MySQL
  • Redis
  • JVM
  • 多线程
  • 计算机网络
  • 操作系统
  • Spring
  • Kafka
  • Elasticsearch
  • Python
  • 面试专题
  • 案例实践
  • 工具使用
  • 项目搭建
  • 服务治理
  • ORM框架
  • 分布式组件
  • MiniSpring
  • 设计模式
  • 算法思想
  • 编码规范
友链
关于
GitHub (opens new window)
  • Java基础

  • Java集合

  • MySQL

  • Redis

  • JVM

  • 多线程

  • 计算机网络

  • Spring

  • Kafka

    • 生产者客户端开发
    • 消费者客户端开发
    • 主题与分区管理
    • 配置管理
    • KafkaAdminClient
    • 消费管理
    • Kafka Streams
    • 日志存储
    • 可靠性研究
    • 深入服务端
    • 深入客户端
    • 集群参数配置
    • 生产者消息分区机制原理
    • 如何确保消息不丢失
    • 如何确保消息不重复消费
    • 消费积压如何处理
    • 生产者是如何管理TCP连接
    • 消费者重平衡问题
    • 位移提交问题
    • 消费者是如何管理TCP连接
      • 一、消费者程序概览
      • 二、何时建立TCP连接
        • 1、发起 FindCoordinator 请求时
        • 2、连接协调者时
        • 3、消费数据时
      • 三、创建多少个TCP连接
      • 四、何时关闭连接
      • 参考
    • 副本机制深入
    • 消费组消费进度如何监控
    • 高水位和Leader Epoch
  • Elasticsearch

  • Python

  • 面试专题

  • 知识库
  • Kafka
旭日
2023-10-12
目录

消费者是如何管理TCP连接

# 一、消费者程序概览

我们通过消费者进行拉取消息,大致代码流程如下:

  • 配置消费者客户端参数
  • 创建相应的消费者实例
  • 订阅主题。
  • 拉取消息并消费
  • 提交消费位移
  • 关闭消费者实例
        // 配置消费者参数
        Properties props = new Properties();
        props.put("key.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("bootstrap.servers",  BROKER_LIST);
        props.put("group.id", GROUP_ID);
        props.put("client.id", "consumer.client.id.demo");
        //创建一个消费者客户端实例
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        //订阅主题
        consumer.subscribe(Collections.singletonList(TOPIC));
        try {
        while (isRunning.get()) {
            ConsumerRecords<String, String> records = 
                    consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> record : 
                 records.records(topic)) {
                System.out.println(record.topic() + " : " + record.value());
            }
        }
        }finally {
            consumer.close();
        }

# 二、何时建立TCP连接

  • 生产者是在创建KafkaProducer的时候创建了一个sender线程去获取集群信息

  • 消费者则是在调用 KafkaConsumer.poll 方法时去创建的TCP连接的,准确来说poll过程中有三个场景能够建立TCP连接

# 1、发起 FindCoordinator 请求时

消费端中有一个协调者的角色,它驻留在 Broker 端的内存中,负责消费者组的组成员管理和各个消费者的位移提交管理。

当消费者程序首次启动调用 poll 方法时,它需要向 Kafka 集群发送一个名为 FindCoordinator的请求,希望 Kafka 集群告诉它哪个 Broker 是管理它的协调者。

# 2、连接协调者时

消费者知晓了真正的协调者后,会创建连向该 Broker 的 Socket 连接。只有成功连入协调者,协调者才能开启正常的组协调操作

# 3、消费数据时

消费者会为每个要消费的分区创建与该分区领导者副本所在 Broker 连接的 TCP。

# 三、创建多少个TCP连接

消费者程序会创建 3 类 TCP 连接:

  • 确定协调者和获取集群元数据。
  • 连接协调者,令其执行组成员管理操作
  • 执行实际的消息获取

# 四、何时关闭连接

消费者关闭 Socket 也分为主动关闭和 Kafka 自动关闭

  • 主动关闭:手动调用KafkaConsumer.close() 方法,或者是执行 Kill 命令
  • 自动关闭:Kafka 自动关闭是由消费者端参数connection.max.idle.ms控制的,该参数现在的默认值是 9 分钟

需要注意的是:一般我们在消费者程序中使用的循环的方式来调用poll方法获取消息,这相当于让这些 Socket 连接上总是能保证有请求在发送,从而也就实现了“长连接”的效果。

当执行实际的消息获取这种请求创建之后,消费者程序会废弃掉第一类TCP连接(确定协调者和获取集群元数据。)

# 参考

官方文档 (opens new window)

图解Kafka之实战指南 (opens new window)

Kafka 核心技术与实战 (opens new window)

#消息队列
上次更新: 2024/06/29, 15:13:44
位移提交问题
副本机制深入

← 位移提交问题 副本机制深入→

最近更新
01
基础概念
10-31
02
Pytorch
10-30
03
Numpy
10-30
更多文章>
Theme by Vdoing | Copyright © 2021-2024 旭日 | 蜀ICP备2021000788号-1
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式