消费者客户端开发
# 一、消费者与消费组
# 1、基本概念
消费者(Consumer)负责订阅 Kafka 中的主题(Topic),并且从订阅的主题上拉取消息。
与其他一些消息中间件不同的是:在 Kafka 的消费理念中还有一层消费组(Consumer Group)的概念,每个消费者都有一个对应的消费组。当消息发布到主题后,只会被投递给订阅它的每个消费组中的一个消费者。
在上图中,存在消费者组A
和消费者组B
,对应关系如下:
- 消费组A
- P0 - C0
- P1 - C1
- P2 - C2
- P3 - C3
- 消费组B
- P0和P1 - C4
- P2和P3 - C5
两个消费组之间互不影响。每个消费者只能消费所分配到的分区中的消息。换言之,每一个分区只能被一个消费组中的一个消费者所消费。
# 2、消费组的伸缩
一个消费者
现在一个消费组里面只有一个消费者,那么这个消费者C0订阅了7个分区。
两个消费者
现在一个消费组里面有两个消费者,按照既定的逻辑,需要将原来消费者C0的部分分区分配给消费者C1消费,如上图所示。消费者C0和C1各自负责消费所分配到的分区,彼此之间并无逻辑上的干扰。
多个消费者
消费者与消费组这种模型可以让整体的消费能力具备横向伸缩性,我们可以增加(或减少)消费者的个数来提高(或降低)整体的消费能力。
但是如果消费者过多,出现了消费者的个数大于分区个数的情况,就会有消费者分配不到任何分区。
一般情况下,Consumer 实例的数量应该等于该 Group 订阅主题的分区总数。
# 3、消息投递模式
点对点:基于队列的,消息生产者发送消息到队列,消息消费者从队列中接收消息。
发布/订阅:定义了如何向一个内容节点发布和订阅消息,这个内容节点称为主题(Topic),主题可以认为是消息传递的中介,消息发布者将消息发布到某个主题,而消息订阅者从主题中订阅消息。
- 如果所有的消费者都隶属于同一个消费组,那么所有的消息都会被均衡地投递给每一个消费者,即每条消息只会被一个消费者处理,这就相当于点对点模式的应用。
- 如果所有的消费者都隶属于不同的消费组,那么所有的消息都会被广播给所有的消费者,即每条消息会被所有的消费者处理,这就相当于发布/订阅模式的应用。
# 4、传统消息模式的缺点
点对点
由于点对点的特性,消息只能被一个消费者消费,这应该是特性,并非缺点。但是这种模型的可扩展很差,因为下游如果存在多个消费者竞争的情况,会导致性能很差。
发布/订阅
发布 / 订阅模型倒是允许消息被多个 Consumer 消费,但它的问题也是伸缩性不高,因为每个订阅者都必须要订阅主题的所有分区。这种全量订阅的方式既不灵活,也会影响消息的真实投递效果。
# 5、重平衡
Rebalance 本质上是一种协议,规定了一个 Consumer Group 下的所有 Consumer 如何达成一致,来分配订阅 Topic 的每个分区。
Rebalance触发条件
- 组成员数发生变更。比如有新的 Consumer 实例加入组或者离开组,抑或是有 Consumer 实例崩溃被“踢出”组。
- 订阅主题数发生变更。Consumer Group 可以使用正则表达式的方式订阅主题,比如
consumer.subscribe(Pattern.compile("t.*c"))
就表明该 Group 订阅所有以字母 t 开头、字母 c 结尾的主题。在 Consumer Group 的运行过程中,你新创建了一个满足这样条件的主题,那么该 Group 就会发生 Rebalance。 - 订阅主题的分区数发生变更。Kafka 当前只能允许增加一个主题的分区数。当分区数增加时,就会触发订阅该主题的所有 Group 开启 Rebalance。
重平衡的问题
在 Rebalance 过程中,所有 Consumer 实例都会停止消费,等待 Rebalance 完成。这是 Rebalance 为人诟病的一个方面。
# 二、消费者客户端开发
# 1、消费者逻辑
配置消费者客户端参数及创建相应的消费者实例。
订阅主题。
拉取消息并消费。
提交消费位移。
关闭消费者实例。
# 2、配置优化
静态实例
public static Properties initConfig(){
Properties props = new Properties();
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("bootstrap.servers", BROKER_LIST);
props.put("group.id", GROUP_ID);
props.put("client.id", "consumer.client.id.demo");
return props;
}
配置文件
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: spring-boot-demo
# 手动提交
enable-auto-commit: false
auto-offset-reset: latest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
properties:
session.timeout.ms: 60000
bootstrap.servers
:该参数用来指定生产者客户端连接 Kafka 集群所需的 broker 地址清单,具体的内容格式为 host1:port1,host2:port2,可以设置一个或多个地址,中间以逗号隔开,此参数的默认值为“”。key.serializer
和value.serializer
:这两个参数分别用来指定 key 和 value 反序列化操作的序列化器。client.id
:KafkaConsumer 对应的客户端id,默认值也为“”。group.id
:消费者隶属的消费组的名称,默认值为“”。
这部分同样可以采用相关静态常量避免出错。
# 3、订阅主题和分区
主题订阅
在创建好消费者之后,我们就需要为该消费者订阅相关的主题了。一个消费者可以订阅一个或多个主题。
//创建一个消费者客户端实例
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
//订阅主题
consumer.subscribe(Collections.singletonList(TOPIC));
如果前后两次订阅了不同的主题,那么消费者以最后一次的为准:
consumer.subscribe(Arrays.asList(topic1));
consumer.subscribe(Arrays.asList(topic2));
上述代码中,消费者最终订阅的是topic2
分区订阅
消费者不仅可以通过 KafkaConsumer.subscribe() 方法订阅主题,还可以直接订阅某些主题的特定分区,在 KafkaConsumer 中还提供了一个 assign() 方法来实现这些功能,此方法的具体定义如下:
public void assign(Collection<TopicPartition> partitions)
TopicPartition类如下:
public final class TopicPartition implements Serializable {
private final int partition;
private final String topic;
public TopicPartition(String topic, int partition) {
this.partition = partition;
this.topic = topic;
}
public int partition() {
return partition;
}
public String topic() {
return topic;
}
//省略hashCode()、equals()和toString()方法
}
- topic:主题名
- partition:分区号
如果我们要订阅quickstart-events
主题,分区为0:
consumer.assign(Arrays.asList(new TopicPartition("quickstart-events", 0)));
分区信息获取
KafkaConsumer 中的 partitionsFor() 方法可以用来查询指定主题的元数据信息,partitionsFor() 方法的具体定义如下:
public List<PartitionInfo> partitionsFor(String topic)
public class PartitionInfo {
private final String topic;
private final int partition;
private final Node leader;
private final Node[] replicas;
private final Node[] inSyncReplicas;
private final Node[] offlineReplicas;
}
演示通过主题订阅主题下的全部分区
List<TopicPartition> partitions = new ArrayList<>();
List<PartitionInfo> partitionInfos = consumer.partitionsFor(TOPIC);
if (partitionInfos != null) {
for (PartitionInfo tpInfo : partitionInfos) {
partitions.add(new TopicPartition(tpInfo.topic(), tpInfo.partition()));
}
}
consumer.assign(partitions);
- 通过主题获取元数据信息
- 遍历元数据信息,放入到分区集合中
- 订阅分区集合
取消订阅
consumer.unsubscribe();
# 4、反序列化
生产者发送消息之前需要将消息进行序列化操作;
消费者接受消息之前需要就信息进行反序列化操作。
# 5、消息消费
消息消费一般有两种模式:
- 推模式:是服务端主动将消息推送给消费者
- 拉模式:是消费者主动向服务端发起请求来拉取消息。
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
Kafka的消息消费是一个不断轮询的过程,通过不停调用poll()
方法,而poll()
返回的是所订阅的主题(分区)上的一组消息。
对于 poll()
方法而言,如果某些分区中没有可供消费的消息,那么此分区对应的消息拉取的结果就为空;
如果订阅的所有分区中都没有可供消费的消息,那么 poll() 方法返回为空的消息集合。
消费对象
消费者消费到的每条消息类型为ConsumerRecord
,这个和生产者发送的消息类型ProducerRecord
相对应,代码如下:
public class ConsumerRecord<K, V> {
private final String topic;
private final int partition;
private final long offset;
private final long timestamp;
private final TimestampType timestampType;
private final int serializedKeySize;
private final int serializedValueSize;
private final Headers headers;
private final K key;
private final V value;
private volatile Long checksum;
}
- topic:主题名称
- partition:所在分区
- timestamp:表示时间戳
- key:消息的键
- value :消息的值
而poll()
方法的返回值类型是 ConsumerRecords,它用来表示一次拉取操作所获得的消息集,内部包含了若干ConsumerRecord。
现在我们需要从消息集合中获得指定分区消息:
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(1000));
for (TopicPartition tp : records.partitions()) {
for (ConsumerRecord<String, String> record : records.records(tp)) {
System.out.println(record.partition()+" : "+record.value());
}
}
- 获取到消息集合
- 遍历消息集合中的每一个分区
- 通过分区获得指定分区消息
现在我们需要从消息集合中获得指定主题消息:
List<String> topicList = Arrays.asList(topic1, topic2);
consumer.subscribe(topicList);
try {
while (isRunning.get()) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(1000));
for (String topic : topicList) {
for (ConsumerRecord<String, String> record :
records.records(topic)) {
System.out.println(record.topic() + " : " + record.value());
}
}
}
}finally {
consumer.close();
}
- 获取到消息集合
- 遍历主题集合
- 从每一个主题去获取消费记录
# 三、消费者客户端原理
# 1、位移提交
# 基本概念
对于Kafka分区而言,它的每条消息都有唯一的 offset,用来表示消息在分区中对应的位置。
对于消费者而言,它也有一个 offset 的概念,消费者使用 offset 来表示消费到分区中某个消息所在的位置。
对于同一个消息而言,他在分区的位移量和消费的唯一是相同的。而且这个位移提交必须做持久化保存,而不是单单保存在内存中,否则消费者重启之后就无法知晓之前的消费位移。
为什么需要位移提交
- 通过
Poll()
方法获取到的是还没有被消费的消息集合,要做到这一点,就需要记录上一次消费到哪个位置了。 - 对于新的消费者加入,分区那么必然会有再均衡的动作,对于同一分区而言,它可能在再均衡动作之后分配给新的消费者,如果不持久化保存消费位移,那么这个新的消费者也无法知晓之前的消费位移。
# 内部结构
在新消费者客户端中,消费位移存储在 Kafka 内部的主题__consumer_offsets 中。
- 当前消费到的位置:某一次拉取操作中此分区消息的最大偏移量,假设当前消费者已经消费了x位置的消息,那么我们就可以说消费者的消费位移为x
- 下一次拉取的消息的位置:表示下一条需要拉取的消息的位置
案例
public static void main(String[] args) {
Properties properties = initConfig();
//创建一个消费者客户端实例
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
TopicPartition tp = new TopicPartition(TOPIC, 0);
consumer.assign(Collections.singletonList(tp));
//当前消费到的位移
long lastConsumedOffset = -1;
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
if (records.isEmpty()) {
break;
}
List<ConsumerRecord<String, String>> partitionRecords = records.records(tp);
lastConsumedOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
//同步提交消费位移
consumer.commitSync();
}
System.out.println("comsumed offset is " + lastConsumedOffset);
OffsetAndMetadata offsetAndMetadata = consumer.committed(tp);
System.out.println("commited offset is " + offsetAndMetadata.offset());
long posititon = consumer.position(tp);
System.out.println("the offset of the next record is " + posititon);
}
// output
comsumed offset is 50
commited offset is 51
the offset of the next record is 51
对于位移提交的具体时机的把握也很有讲究,有可能会造成重复消费和消息丢失的现象。
# 消息丢失
当前一次poll拉取到消息范围为:[x+2,x+7],现在假设我们是拉取消息后就进行位移提交。
如果此时正在消费x+5,出现系统故障,在故障恢复之后,我们重新拉取的消息是从 x+8 开始的。也就是说,x+5 至 x+7 之间的消息并未能被消费,如此便发生了消息丢失的现象。
# 消息重复消费
还是上图,假设我们是消费完成之后进行位移提交。
如果此时正在消费x+5,出现系统故障,在故障恢复之后,我们重新拉取的消息是从 x+2 开始的。也就是说,x+2至
x+5都会重复消费。
# 默认提交机制
在 Kafka 中默认的消费位移的提交方式是自动提交,这个由消费者客户端参数 enable.auto.commit 配置,默认值为 true。
在默认的方式下,消费者每隔5秒会将拉取到的每个分区中最大的消息位移进行提交。自动位移提交的动作是在 poll() 方法的逻辑里完成的,在每次真正向服务端发起拉取请求之前会检查是否可以进行位移提交,如果可以,那么就会提交上一次轮询的位移。
但是在默认提交机制下仍然存在消息丢失和消息重复消费问题:
消息重复消息:由于默认情况下是每5秒进行提交,相当于是延时提交,如果在提交之前,系统发生崩溃,恢复之后,在这间隔的5秒之间所消费的消息都会重复消费。
消息丢失:现在有两个线程,一个是拉取线程A,另一个是处理线程B,拉取线程A拉取完毕之后,进行了位移提交假设是x+6,而此刻处理线程B还是处理消息,当处理到x+3的时候出现故障,那么等恢复之后,x+3 至 x+6 之间的消息就没有得到相应的处理,这样便发生消息丢失的现象。
# 手动提交
由于实际业务中,我们消息处理并非只是简单获取到消息即可,还可能需要放入缓存、放入数据库等操作,所以我们一般根据业务进行手动提交。
配置不自动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
手动提交有两种方式,分别为同步提交
和异步提交
- 同步提交:commitSync()
- 异步提交:commitAsync()
同步提交
while (isRunning.get()) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
//do some logical processing.
}
consumer.commitSync();
}
- 拉取消息
- 遍历消息进行业务逻辑处理
- 同步提交
上诉的代码还可以更改为批量处理:
final int minBatchSize = 200;
List<ConsumerRecord> buffer = new ArrayList<>();
while (isRunning.get()) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
//do some logical processing with buffer.
consumer.commitSync();
buffer.clear();
}
}
以上两种方式都存在消费重复消息问题:在提交之前如果出现系统崩溃,在两次位移提交的窗口中出现了重复消费的现象。
异步提交
异步提交的方式(commitAsync())在执行的时候消费者线程不会被阻塞,可能在提交消费位移的结果还未返回之前就开始了新一次的拉取操作。
while (isRunning.get()) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
//do some logical processing.
}
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,
Exception exception) {
if (exception == null) {
System.out.println(offsets);
}else {
log.error("fail to commit offsets {}", offsets, exception);
}
}
});
}
# 2、控制和关闭消费
在某些业务场景下,我们需要暂停某些分区的消费而先消费其他分区,当达到一定条件时再恢复这些分区的消费。
KafkaConsumer 中提供如下两个方法:
public void pause(Collection<TopicPartition> partitions)
public void resume(Collection<TopicPartition> partitions)
- pause:暂停拉取操作
- resume:恢复拉取操作
# 3、指定位移消费
Kafka在以下几种情况下,存在找不到消费位移:
- 当一个新的消费者建立的时候,根本没有可以查找的消费位移。
- 消费组内的一个新消费者订阅了一个新的主题,它也没有可以查找的消费位移。
- 当 __consumer_offsets 主题中有关这个消费组的位移信息过期而被删除后,它也没有可以查找的消费位移。
在Kafka中每当消费者查找不到所记录的消费位移时,就会根据消费者客户端参数 auto.offset.reset 的配置来决定从何处开始进行消费。
latest
:表示从分区末尾开始消费消息earliest
:消费者会从起始处,也就是0开始消费。
在某些业务场景下,我们需要更细维度的控制,可以让我们从特定的位移处开始拉取消息:
public void seek(TopicPartition partition, long offset)
partition
:分区offset
:用来指定从分区的哪个位置开始消费
// 配置
Properties properties = initConfig();
// 创建一个消费者客户端实例
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList(TOPIC));
consumer.poll(Duration.ofMillis(10000));
Set<TopicPartition> assignment = consumer.assignment();
for (TopicPartition topicPartition : assignment) {
consumer.seek(topicPartition, 10);
}
while(isRunning.get()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records.records(TOPIC)) {
System.out.println(record.partition() + " : " + record.offset() + " : " + record.value());
}
}
assignment()
方法获取当前分配给此使用者的分区集,在遍历分区,指定从特定的位移处开始拉取消息。
对于一些特殊的要求,Kakfa也能够支持:
// 从头开始消费
public void seekToBeginning(Collection<TopicPartition> partitions)
// 尾部开始消费
public void seekToEnd(Collection<TopicPartition> partitions)
// 指定时间开始消费
public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(
Map<TopicPartition, Long> timestampsToSearch)
public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(
Map<TopicPartition, Long> timestampsToSearch,
Duration timeout)
总的来说,seek
这个方法为我们提供了从特定位置读取消息的能力。
# 4、消费者-均衡器
再均衡是指分区的所属权从一个消费者转移到另一消费者的行为,它为消费组具备高可用性和伸缩性提供保障,使我们可以既方便又安全地删除消费组内的消费者或往消费组内添加消费者。
同时,如果消费者在还没提交位移之前就发生了再均衡,之后这个分区又被分配给了消费组内的另一个消费者,原来被消费完的那部分消息又被重新消费一遍,也就是发生了重复消费。一般情况下,应尽量避免不必要的再均衡的发生。
ConsumerRebalanceListener 是一个接口,用来设定发生再均衡动作前后的一些准备或收尾的动作,包含以下两个方法:
void onPartitionsRevoked(Collection<TopicPartition> partitions);
void onPartitionsAssigned(Collection<TopicPartition> partitions);
void onPartitionsRevoked(Collection partitions)
这个方法会在再均衡开始之前和消费者停止读取消息之后被调用。可以通过这个回调方法来处理消费位移的提交,以此来避免一些不必要的重复消费现象的发生。参数 partitions 表示再均衡前所分配到的分区。void onPartitionsAssigned(Collection partitions)
这个方法会在重新分配分区之后和消费者开始读取消费之前被调用。参数 partitions 表示再均衡后所分配到的分区。
Properties properties = initConfig();
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
consumer.subscribe(Collections.singletonList(TOPIC), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
consumer.commitSync(currentOffsets);
currentOffsets.clear();
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
}
});
try {
while (isRunning.get()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
//process the record.
currentOffsets.put(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1)
);
}
consumer.commitAsync(currentOffsets, null);
}
} finally {
consumer.close();
}
- 创建一个局部变量currentOffsets 中
- 这样在正常消费的时候可以通过 commitAsync() 方法来异步提交消费位移
- 发生再均衡动作之前可以通过再均衡监听器的 onPartitionsRevoked() 回调执行 commitSync() 方法同步提交消费位移
# 5、消费者-拦截器
生产者使用拦截器,可以在发送消息之前对消息进行处理。
消费者也有对应的拦截器,消费到消息或在提交消费位移时进行一些定制化的操作。
消费者拦截器需要自定义实现 org.apache.kafka.clients.consumer. ConsumerInterceptor
接口:
public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);
public void close()。
onConsume
:可以对消息进行相应的定制化操作,比如修改操作返回的消息内容,按照某种规则过滤消息。onCommit
:可以使用这个方法来记录跟踪所提交的位移信息
下面我们自定义一个拦截器:
public class ConsumerInterceptorTTL implements ConsumerInterceptor<String, String> {
private static final long EXPIRE_INTERVAL = 10 * 1000;
@Override
public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
long now = System.currentTimeMillis();
// 定义一个新返回的消息结果集合
Map<TopicPartition, List<ConsumerRecord<String, String>>> newRecords = new HashMap<>();
// 遍历分区
for (TopicPartition tp : records.partitions()) {
// 获得指定分区下的消息记录
List<ConsumerRecord<String, String>> tpRecords = records.records(tp);
List<ConsumerRecord<String, String>> newTpRecords = new ArrayList<>();
for (ConsumerRecord<String, String> record : tpRecords) {
if (now - record.timestamp() < EXPIRE_INTERVAL) {
newTpRecords.add(record);
}
}
if (!newTpRecords.isEmpty()) {
newRecords.put(tp, newTpRecords);
}
}
return new ConsumerRecords<>(newRecords);
}
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
offsets.forEach((tp, offset) -> System.out.println(tp + ":" + offset.offset()));
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
配置拦截器:
// 配置拦截器
props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, ConsumerInterceptorTTL.class.getName());
生产者发送消息:
public class Producer {
public static final String BROKER_LIST = "localhost:9092";
public static final String TOPIC = "quickstart-events";
private static final long EXPIRE_INTERVAL = 10 * 1000;
public static Properties initConfig(){
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.CLIENT_ID_CONFIG, "producer.client.id.demo");
props.put(ProducerConfig.RETRIES_CONFIG, 10);
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
TestInterceptorPrefix.class.getName() + "," + TestPlusInterceptorPrefix.class.getName());
return props;
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties properties = initConfig();
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
ProducerRecord<String, String> record1 = new ProducerRecord<>(TOPIC, 0, System
.currentTimeMillis()-EXPIRE_INTERVAL, null, "first-expire-data");
producer.send(record1).get();
ProducerRecord<String, String> record2 = new ProducerRecord<>(TOPIC, 0, System
.currentTimeMillis(), null, "normal-data");
producer.send(record2).get();
ProducerRecord<String, String> record3 = new ProducerRecord<>(TOPIC, 0, System
.currentTimeMillis()-EXPIRE_INTERVAL, null, "last-expire-data");
producer.send(record3).get();
producer.close();
}
}