学习总结录 学习总结录
首页
归档
分类
标签
  • Java基础
  • Java集合
  • MySQL
  • Redis
  • JVM
  • 多线程
  • 计算机网络
  • 操作系统
  • Spring
  • Kafka
  • Elasticsearch
  • Python
  • 面试专题
  • 案例实践
  • 工具使用
  • 项目搭建
  • 服务治理
  • ORM框架
  • 分布式组件
  • MiniSpring
  • 设计模式
  • 算法思想
  • 编码规范
友链
关于
GitHub (opens new window)
首页
归档
分类
标签
  • Java基础
  • Java集合
  • MySQL
  • Redis
  • JVM
  • 多线程
  • 计算机网络
  • 操作系统
  • Spring
  • Kafka
  • Elasticsearch
  • Python
  • 面试专题
  • 案例实践
  • 工具使用
  • 项目搭建
  • 服务治理
  • ORM框架
  • 分布式组件
  • MiniSpring
  • 设计模式
  • 算法思想
  • 编码规范
友链
关于
GitHub (opens new window)
  • Java基础

  • Java集合

  • MySQL

  • Redis

  • JVM

  • 多线程

  • 计算机网络

  • Spring

  • Kafka

    • 生产者客户端开发
      • 一、生产者客户端开发
        • 1、生产者逻辑
        • 2、配置优化
        • 3、消息对象
        • 4、消息发送
        • 5、资源回收
      • 二、序列化-分区器-拦截器
        • 1、序列化
        • 2、分区器
        • 3、生产拦截器
      • 三、生产者客户端发送原理
        • 1、拦截器-序列化器-分区器
        • 2、RecordAccumulator
        • 3、Sender
      • 参考
    • 消费者客户端开发
    • 主题与分区管理
    • 配置管理
    • KafkaAdminClient
    • 消费管理
    • Kafka Streams
    • 日志存储
    • 可靠性研究
    • 深入服务端
    • 深入客户端
    • 集群参数配置
    • 生产者消息分区机制原理
    • 如何确保消息不丢失
    • 如何确保消息不重复消费
    • 消费积压如何处理
    • 生产者是如何管理TCP连接
    • 消费者重平衡问题
    • 位移提交问题
    • 消费者是如何管理TCP连接
    • 副本机制深入
    • 消费组消费进度如何监控
    • 高水位和Leader Epoch
  • Elasticsearch

  • Python

  • 面试专题

  • 知识库
  • Kafka
旭日
2023-04-14
目录

生产者客户端开发

# 一、生产者客户端开发

# 1、生产者逻辑

  1. 配置生产者客户端参数及创建相应的生产者实例。
  2. 构建待发送的消息。
  3. 发送消息。
  4. 关闭生产者实例。

# 2、配置优化

针对我们的Properties配置,我们可以采用配置文件的方式或者静态实例的方式来进行加载。

静态实例

    public static Properties initConfig(){
        Properties props = new Properties();
        props.put("bootstrap.servers", brokerList);
        props.put("key.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        props.put("client.id", "producer.client.id.demo");
        return props;
    }

配置文件

spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      retries: 0
      batch-size: 16384
      buffer-memory: 33554432
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
  • bootstrap.servers:该参数用来指定生产者客户端连接 Kafka 集群所需的 broker 地址清单,具体的内容格式为 host1:port1,host2:port2,可以设置一个或多个地址,中间以逗号隔开,此参数的默认值为“”。
  • key.serializer和value.serializer:这两个参数分别用来指定 key 和 value 序列化操作的序列化器。
  • client.id:这个参数用来设定 KafkaProducer 对应的客户端id,默认值为“”。

配置key优化

实际配置过程中,key由于字符串过长,容易出错。

我们可以直接使用客户端中的 org.apache.kafka.clients.producer.ProducerConfig 类来做一定程度上的预防措施。

    public static Properties initConfig(){
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "producer.client.id.demo");
        return props;
    }

# 3、消息对象

在发送消息之前,我们要进行消息对象的构建:

ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, "hello, Kafka!");

发送的消息对象包括记录要发送到的主题名称、可选的分区号以及可选的键和值。

public class ProducerRecord<K, V> {
    private final String topic; //主题
    private final Integer partition; //分区号
    private final Headers headers; //消息头部
    private final K key; //键
    private final V value; //值
    private final Long timestamp; //消息的时间戳
    //省略其他成员方法和构造方法
}
  • 主题:消息的分类。
  • 分区号:
    • 如果指定了有效的分区号,则在发送记录时将使用该分区。
    • 如果没有指定分区,但有一个键,则会使用键的散列选择分区。
    • 如果键和分区都不存在,分区将以循环方式分配。
  • Key:是用来指定消息的键,它不仅是消息的附加信息,还可以用来计算分区号进而可以让消息发往特定的分区。
  • value:消息的内容。

# 4、消息发送

发送消息的方法,其中topic和value是必须的:

    public ProducerRecord(String topic, Integer partition, Long timestamp, 
                          K key, V value, Iterable<Header> headers)
    public ProducerRecord(String topic, Integer partition, Long timestamp,
                          K key, V value)
    public ProducerRecord(String topic, Integer partition, K key, V value, 
                          Iterable<Header> headers)
    public ProducerRecord(String topic, Integer partition, K key, V value)
    public ProducerRecord(String topic, K key, V value)
    public ProducerRecord(String topic, V value)

创建生产者实例和构建消息之后,就可以开始发送消息了。发送消息主要有三种模式:

  • 发后即忘(fire-and-forget)
  • 同步(sync)
  • 异步(async)

发后即忘

    KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
    ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, "hello, Kafka!");
    try {
        producer.send(record);
    } catch (Exception e) {
        e.printStackTrace();
    }
    producer.close();

这种就是典型的发后即忘模式,只管往 Kafka 中发送消息而并不关心消息是否正确到达,有如下特点:

  • 性能高
  • 可靠性差

同步方式

    public Future<RecordMetadata> send(ProducerRecord<K, V> record)
    try {
        producer.send(record).get();
    } catch (ExecutionException | InterruptedException e) {
        e.printStackTrace();
    }

实际上 send() 方法本身就是异步的,send() 方法返回的 Future 对象可以使调用方稍后获得发送的结果。示例中在执行 send() 方法之后直接链式调用了 get() 方法来阻塞等待 Kafka 的响应,直到消息发送成功,或者发生异常。如果发生异常,那么就需要捕获异常并交由外层逻辑处理。

如果要获取消息的一些元数据信息,可以采用如下方式:

    try {
        Future<RecordMetadata> future = producer.send(record);
        RecordMetadata metadata = future.get();
        System.out.println(metadata.topic() + "-" +
                           metadata.partition() + ":" + metadata.offset());
    } catch (ExecutionException | InterruptedException e) {
        e.printStackTrace();
    }

同步的发送方式有如下特点:

  • 同步发送的方式可靠性高,要么消息被发送成功,要么发生异常。如果发生异常,则可以捕获并进行相应的处理。
  • 同步发送的方式性能会差很多,需要阻塞等待一条消息发送完之后才能发送下一条。

异常处理

KafkaProducer 中一般会发生两种类型的异常:

  • 可重试的异常:
  • 不可重试的异常:

对于可重试的异常,如果配置了 retries 参数,那么只要在规定的重试次数内自行恢复了,就不会抛出异常。retries 参数的默认值为0,配置方式参考如下:

    props.put(ProducerConfig.RETRIES_CONFIG, 10);

异步

send() 方法的返回值类型就是 Future,而 Future 本身就可以用作异步的逻辑处理。

但是由于这个方法可以随时调用get方法,以及怎么调用都是需要面对的问题,消息不停地发送,那么诸多消息对应的 Future 对象的处理难免会引起代码处理逻辑的混乱。

因此采用Callback 的方式非常简洁明了,Kafka 有响应时就会回调,要么发送成功,要么抛出异常。

    public static void main(String[] args) {
        Properties properties = initConfig();
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
        ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, "hello, Kafka!");
        try {
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    System.out.println(metadata.topic() + "-" +
                            metadata.partition() + ":" + metadata.offset());
                }
            });
        } catch (Exception e) {
            e.printStackTrace();
        }
        producer.close();
    }

回调函数中有两个参数:

  • RecordMetadata:消息元数据,可以获得主题、位移量等
  • Exception:异常

回调的先后顺序

假设我们发送了两条消息,代码如下:

    producer.send(record1, callback1);
    producer.send(record2, callback2);

对于同一个分区而言,如果消息 record1 于 record2 之前先发送,那么 KafkaProducer 就可以保证对应的 callback1 在 callback2 之前调用,也就是说,回调函数的调用也可以保证分区有序。

# 5、资源回收

在日常业务需求中,KafkaProducer可能会发送好几条消息,在发送完这些消息之后,需要调用 KafkaProducer 的 close() 方法来回收资源。

	producer.close();

这个无参close方法,阻塞等待之前所有的发送请求完成后再关闭 KafkaProducer。

对应有一个带有参数的close方法:

	public void close(long timeout, TimeUnit timeUnit)

如果调用了带超时时间 timeout 的 close() 方法,那么只会在等待 timeout 时间内来完成所有尚未完成的请求处理,然后强行退出。

在实际应用中,一般使用的都是无参的 close() 方法。

# 二、序列化-分区器-拦截器

客户端消息在真正发送到Broker,还会经历三个过程:

  • 序列化:需要把对象转化为二进制流的方式
  • 分区器:如果没有指定分区,就需要根据 key 这个字段来计算 partition 的值。分区器的作用就是为消息分配分区。
  • 拦截器:在消息发送之前进行一定处理

# 1、序列化

在之前的案例中,我们生产者序列化和消费者反序列化的方式如下:

    public static Properties initConfig(){
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "producer.client.id.demo");
        return props;
    }

因为我们发送和接受都是String,所以我们采用的StringSerializer序列化。

Kafka有许多序列化器,但是他们都实现了Serializer<T>接口:

public interface Serializer<T> extends Closeable {

    default void configure(Map<String, ?> configs, boolean isKey) {
        // intentionally left blank
    }

    byte[] serialize(String topic, T data);

    default byte[] serialize(String topic, Headers headers, T data) {
        return serialize(topic, data);
    }
    
    @Override
    default void close() {
        // intentionally left blank
    }
}

通常而言,还是采用Json进行序列化和反序列化操作。

# 2、分区器

消息经过序列化之后就需要确定它发往的分区,如果消息 ProducerRecord 中指定了 partition 字段,那么就不需要分区器的作用,因为 partition 代表的就是所要发往的分区号。

如果消息 ProducerRecord 中没有指定 partition 字段,那么就需要依赖分区器,根据 key 这个字段来计算 partition 的值。分区器的作用就是为消息分配分区。

Kafka的分区器都实现了Partitioner接口:

    public int partition(String topic, Object key, byte[] keyBytes, 
                         Object value, byte[] valueBytes, Cluster cluster);
    public void close();

其中 partition() 方法用来计算分区号,返回值为 int 类型。

partition() 方法中的参数:

  • topic:主题、

  • Key:键

  • keyBytes:序列化后的键

  • value:值

  • valueBytes:序列化后的值

  • cluster:当前集群元数据

如果我们要根据业务去实现一个特殊的分区器,只需要实现这个接口,然后配置即可:

public class DemoPartitioner implements Partitioner {
    private final AtomicInteger counter = new AtomicInteger(0);

    @Override
    public int partition(String topic, Object key, byte[] keyBytes,
                         Object value, byte[] valueBytes, Cluster cluster) {
       // todo
       return 0;
    }

    @Override public void close() {}

    @Override public void configure(Map<String, ?> configs) {}
}
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,
        DemoPartitioner.class.getName());

# 3、生产拦截器

生产者拦截器既可以用来在消息发送前做一些准备工作,比如按照某个规则过滤不符合要求的消息、修改消息的内容等,也可以用来在发送回调逻辑前做一些定制化的需求,比如统计类工作。

生产者拦截器需要实现ProducerInterceptor 接口:

    public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
    public void onAcknowledgement(RecordMetadata metadata, Exception exception);
    public void close();

KafkaProducer 在将消息序列化和计算分区之前会调用生产者拦截器的 onSend() 方法来对消息进行相应的定制化操作。

KafkaProducer 会在消息被应答(Acknowledgement)之前或消息发送失败时调用生产者拦截器的 onAcknowledgement() 方法,优先于用户设定的 Callback 之前执行。

案例实现

我们有如下业务:需要在发送的信息之前添加test前缀,并记录发送成功多少条,发送失败多少条。

一般来说最好不要修改消息 ProducerRecord 的 topic、key 和 partition 等信息。

拦截器代码:

public class TestInterceptorPrefix implements ProducerInterceptor {

    private volatile long sendSuccess = 0;
    private volatile long sendFailure = 0;

    @Override
    public ProducerRecord onSend(ProducerRecord record) {
        String modifiedValue = "test-" + record.value();
        return new ProducerRecord<>(record.topic(),
                record.partition(), record.timestamp(),
                record.key(), modifiedValue, record.headers());
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        if (exception == null) {
            sendSuccess++;
        } else {
            sendFailure ++;
        }
    }

    @Override
    public void close() {
        System.out.println("发送成功次数:" + sendSuccess);
        System.out.println("发送成功次数:" + sendFailure);
    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

配置

        props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
                TestInterceptorPrefix.class.getName());

输出

// 生产者
发送成功次数:10
发送成功次数:0

// 消费者
test-Kafka
test-Kafka
test-Kafka
test-Kafka
test-Kafka
test-Kafka
test-Kafka
test-Kafka
test-Kafka
test-Kafka

拦截链

KafkaProducer 中不仅可以指定一个拦截器,还可以指定多个拦截器以形成拦截链。拦截链会按照 interceptor.classes 参数配置的拦截器的顺序来。

配置的时候,各个拦截器之间使用逗号隔开。

现在我们额外配置一个拦截器:

public class TestPlusInterceptorPrefix implements ProducerInterceptor {
    @Override
    public ProducerRecord onSend(ProducerRecord record) {
        String modifiedValue = "testPlus-"+record.value() ;
        return new ProducerRecord<>(record.topic(),
                record.partition(), record.timestamp(),
                record.key(), modifiedValue, record.headers());
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {

    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

拦截器链配置

        props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
                TestInterceptorPrefix.class.getName() + "," + TestPlusInterceptorPrefix.class.getName());

消费者输出

testPlus-test-Kafka

在拦截链中,如果某个拦截器执行失败,那么下一个拦截器会接着从上一个执行成功的拦截器继续执行。所以对于A和B拦截器,如果B拦截器依赖于A拦截器逻辑处理,这个时候如果A执行失败,那么B仍然会进行执行,相关逻辑就会出现问题。

# 三、生产者客户端发送原理

整个生产者客户端由两个线程协调运行,这两个线程分别为主线程和 Sender 线程(发送线程)。

image-20230410112732248

# 1、拦截器-序列化器-分区器

  • 拦截器:发送消息之前进行额外的处理。
  • 序列化器:将对象序列化为字节数组。
  • 分区器:按照一定规则把消息送往对应分区。

# 2、RecordAccumulator

主要作用

主要用来缓存消息以便 Sender 线程可以批量发送,进而减少网络传输的资源消耗以提升性能。

如果生产者发送消息的速度超过发送到服务器的速度,则会导致生产者空间不足,这个时候 KafkaProducer 的 send() 方法调用要么被阻塞,要么抛出异常,这个取决于参数 max.block.ms 的配置,此参数的默认值为60000,即60秒。

内部原理

RecordAccumulator内部为每个分区都维护了一个双端队列,队列中的内容就是 ProducerBatch,即 Deque。

消息写入的时候,追加到队列,消息读取从队头读取。

ProducerBatch 不是 ProducerRecord,ProducerBatch 中可以包含一至多个 ProducerRecord,通过这样减少网络请求的次数以提升整体的吞吐量

  • ProducerRecord:发送的一个消息
  • ProducerBatch:发送的一个消息批次,包含多个消息

内部流程

  • 判断是否存在双端队列,如果没有则创建,如果有就入队
  • 从队列的末尾获取ProducerBatch,如果没有创建
  • 判断ProducerBatch是否还能够加入,如果不能够加入,创建新的ProducerBatch

# 3、Sender

分区转broker

对于客户端Producer:只关心发送到那个分区

对于网络连接:只关心发送到那个具体broker

所以,Sender 从 RecordAccumulator 中获取缓存的消息之后,会进一步将原本<分区, Deque< ProducerBatch>> 的保存形式转变成 <Node, List< ProducerBatch> 的形式。需要做一个应用逻辑层面到网络I/O层面的转换。

封装Request

在转换成 <Node, List> 的形式之后,Sender 还会进一步封装成 <Node, Request> 的形式,这样就可以将 Request 请求发往各个 Node 了,这里的 Request 是指 Kafka 的各种协议请求,对于消息发送而言就是指具体的 ProduceRequest。

# 参考

官方文档 (opens new window)

图解Kafka之实战指南 (opens new window)

#消息队列
上次更新: 2024/06/29, 15:13:44
Spring循环依赖问题
消费者客户端开发

← Spring循环依赖问题 消费者客户端开发→

最近更新
01
基础概念
10-31
02
Pytorch
10-30
03
Numpy
10-30
更多文章>
Theme by Vdoing | Copyright © 2021-2024 旭日 | 蜀ICP备2021000788号-1
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式