RabbitMQ
# RabbitMQ学习
消息指的是两个应用间传递的数据。
**消息队列(Message Queue)**是在消息的传输过程中保存消息的容器。数据结构中我们学习过队列,相当于是一个队列来存放这些消息。
在消息队列
中存在这两个角色:
- 生产者 (负责发送数据到消息队列)
- 消费者 (负责从消息队列中取出数据)
# 为什么使用消息队列
# 解耦
当系统A需要向B、C、D三个系统发送消息的时候,需要去调用相关发送信息的代码。
而当某一天系统A不需要发送给某一个系统的时候或者当还需要额外发送某一个系统的时候,这时候就需要去增删代码。
而使用消息队列
相当于在系统和系统之间建立了桥梁,当你需要发送和接受消息的时候,就去连接桥梁,当不需要的时候就断开桥梁。
# 异步
如果是一个同步请求的话,发送方是需要得到接受的响应的时候,才去发送下一个请求。那么如果当系统A有很多个系统需要去发送消息,而第一个接受消息系统响应特别长的话,就会影响到后续系统接受消息。
但当使用了消息队列
,发送方只需要把消息发送给消息队列即可,而后续的接受消息,由接受消息的系统自己去消息队列拿取,这样极大地提高了性能。
# 削峰
当短时间有大量的请求的时候,如果把这些请求全涌入我们的MySQL
,那么此刻MySQL
肯定无法在短时间处理那么多请求,会导致系统崩溃。而使用消息队列
我们就把这些大量请求存入消息队列,让消息队列
在短时间积压数据。此时只需要我们的消费者
每次从消息队列
中取出一部分消息进行处理即可。
# 安装RabbitMQ
# 安装erLang语言
下载之后的exe直接安装,然后配置环境变量:
# 安装RabbitMQ客户端
选择exe下载之后,一直双击下一步即可。
在安装目录下的sbin
存在以下文件:
在此目录下打开CMD命令,输入rabbitmq-plugins enable rabbitmq_management
命令安装管理页面的插件,然后双击rabbitmq-server.bat
启动脚本,接着我们就可以在服务管理
中看到
RabbitMQ
运行。
现在只需要打开游览器输入http://localhost:15672,账号密码默认是:guest/guest。
# 写个Demo
依赖:
<!--rabbitmq 依赖客户端-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.8.0</version>
</dependency>
<!--操作文件流的一个依赖-->
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.6</version>
</dependency>
生产者:
public class Producer {
/**
* 队列名称
*/
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) {
// 初始化链接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setUsername("guest");
factory.setUsername("guest");
try {
// 建立链接频道
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
/**
* 生成一个队列
* 1.队列名称
* 2.队列里面的消息是否持久化 默认消息存储在内存中
* 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费
* 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除
* 5.其他参数
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "hello world";
/**
* 发送一个消息
* 1.发送到那个交换机
* 2.路由的 key 是哪个
* 3.其他的参数信息
* 4.发送消息的消息体
*/
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("生产者发送消息完毕");
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}
消费者:
public class Consumer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
// 初始化链接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setUsername("guest");
factory.setUsername("guest");
// 建立链接频道
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
System.out.println("等待接收消息....");
// 推送的消息如何进行消费的接口回调
DeliverCallback deliverCallback=(consumerTag, delivery)->{
String message= new String(delivery.getBody());
System.out.println(message);
};
// 取消消费的一个回调接口 如在消费的时候队列被删除掉了
CancelCallback cancelCallback=(consumerTag)->{
System.out.println("消息消费被中断");
};
/**
* 消费者消费消息
* 1.消费哪个队列
* 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答
* 3.消息传递时的回调
* 4.消费者取消时的回调
*/
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
此时我们先运行生产者的Main方法,开始发送消息,运行完毕之后,我们可以在RabbitMQ
客户端中看到如下:
说明此刻我们创建的hello
队列已经存在RabbitMQ
当中,并且有一条待接受的消息。
接下来我们启动消费者的Main方法,开始接受消息:
此时存在消息队列中的消息已经被接受,消息队列中暂无待接受的消息:
现在我们所实现的就是RabbitMQ
的六大模式 (opens new window)中最简单的模式,再简单梳理概念之后,我们将继续进行实现其他几种模式。
# 核心概念
通过上面的Demo我们对于RabbitMQ
这种消息队列有了一定的认知,下面进行一个简单的总结。
# RabbitMQ概念
RabbitMQ
是一个消息中间件:它接受并转发消息。你可以把它当做一个快递站点,当你要发送一个包裹时,你把你的包裹放到快递站,快递员最终会把你的快递送到收件人那里,按照这种逻辑 RabbitMQ 是一个快递站,一个快递员帮你传递快件。RabbitMQ 与快递站的主要区别在于,它不处理快件而是接收, 存储和转发消息数据。
# RabbitMQ四大部分
生产者
产生数据发送消息的程序是生产者。
交换机
交换机是 RabbitMQ 非常重要的一个部件,一方面它接收来自生产者的消息,另一方面它将消息推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推送到多个队列,亦或者是把消息丢弃,这个得有交换机类型决定。
队列
队列是 RabbitMQ 内部使用的一种数据结构,尽管消息流经 RabbitMQ 和应用程序,但它们只能存 储在队列中。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可 以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。这就是我们使用队列的方式
消费者
消费与接收具有相似的含义。消费者大多时候是一个等待接收消息的程序。请注意生产者,消费 者和消息中间件很多时候并不在同一机器上。同一个应用程序既可以是生产者又是可以是消费者。
# 工作队列
工作队列
(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。 相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进 程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。
# 轮训分发消息
工具类
public class RabbitMqUtils {
public static Channel getChannel() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
return channel;
}
}
工作线程
public class Worker01 {
private static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
// 接受消息的回调
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String receivedMessage = new String(delivery.getBody());
System.out.println("接收到消息:" + receivedMessage);
};
// 取消消息的回调
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println(consumerTag + "消费者取消消费接口回调逻辑");
};
System.out.println("C2 消费者启动等待消费......");
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
}
}
开启两个工作线程:
发送线程
public class Task01 {
private static final String QUEUE_NAME="hello";
public static void main(String[] args) throws Exception {
try(Channel channel= RabbitMqUtils.getChannel();) {
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//从控制台当中接受信息
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()){
String message = scanner.next();
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("发送消息完成:"+message);
}
}
}
}
发送结果:
接受结果-工作01:
接受结果-工作02:
可以看到现在两个工作线程是轮流去接受消息队列的消息。
# 消息应答
现在试想一种情况,如果一个消费者在接受一个消息,而如果在接受消息的时候,这个消费者崩掉了,而RabbitMQ
一旦向消费者传递了一条消息,便立即将该消息标记为删除。那么此刻我们这条还没有完成的消息就丢失了。
为了防止这种情况的出现,就引入消息应答
机制:消费者在接收到消息并且处理该消息之后,告诉RabbitMQ
它已经处理了,RabbitMQ
可以把该消息删除了。
默认消息采用的是自动应答。
消息应答-接收端
public class AckWork {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
// 指定一个队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 推送的消息如何进行消费的接口回调
DeliverCallback deliverCallback=(consumerTag, delivery)->{
String message= new String(delivery.getBody());
System.out.println("接受的消息是:" + message);
try {
doWork(message);
} catch (InterruptedException e) {
e.printStackTrace();
} finally{
// 每次处理完成一个消息后,手动发送一次应答。
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
// 取消消费的一个回调接口 如在消费的时候队列被删除掉了
CancelCallback cancelCallback=(consumerTag)->{
System.out.println("消息消费被中断");
};
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, cancelCallback);
}
private static void doWork(String task) throws InterruptedException {
TimeUnit.SECONDS.sleep(task.length());
}
}
关闭自动应答机制:
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, cancelCallback);
手动发送应答:
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
消息应答-发送端
public class AckNewTask {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
// 指定一个队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 发送消息
for (int i = 0; i < 10; i++) {
String message = i + "";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("发送的消息是:" + message);
}
}
}
在这样的情况下即使其中一个接收端在接受消息的时候中断了,发送端在没接收到应答之前,会重新把消息进行发送。
# 持久化
在消息应答中我们实现了即使消费者死了,任务也不会丢失。但是如果 RabbitMQ 服务器停止,我们的任务仍然会丢失。 当 RabbitMQ 退出或崩溃时,将会丢失所有的队列和信息,除非你告诉它不要丢失。需要两件事来确保消息不丢失:我们需要分别将队列
和消息
标记为持久化。
队列持久化:
boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);
在源码中:
/**
* Declare a queue
* @see com.rabbitmq.client.AMQP.Queue.Declare
* @see com.rabbitmq.client.AMQP.Queue.DeclareOk
* @param queue the name of the queue
* @param durable true if we are declaring a durable queue (the queue will survive a server restart)
* @param exclusive true if we are declaring an exclusive queue (restricted to this connection)
* @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)
* @param arguments other properties (construction arguments) for the queue
* @return a declaration-confirm method to indicate the queue was successfully declared
* @throws java.io.IOException if an error is encountered
*/
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
Map<String, Object> arguments) throws IOException;
消息持久化:
channel.basicPublish("", "hello", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
# 公平转发
有时候会存在一个消费者很忙,而另一个消费者处于空闲状态,而RabbitMQ
只会分派消息,它不看消费者的未确认消息的数量,为了让RabbitMQ
不要一次给一个工作线程多个消息。换句话说,在处理并确认前一个消息之前,不要向工作线程发送新消息。相反,它将发送到下一个还不忙的工作线程。
int prefetchCount = 1;
channel.basicQos(prefetchCount) ;
# 发布确认
发布确认并非一种模式,只是一种消息确认机制。
# 开启发布确认
发布确认默认是没有开启的,如果要开启需要调用方法 confirmSelect
,每当你要想使用发布 确认,都需要在 channel 上调用该方法:
// 开启发布确认
channel.confirmSelect();
# 单个发布确认
这是一种简单的确认方式,它是一种同步确认发布的方式,也就是发布一个消息之后只有它被确认发布,后续的消息才能继续发布。
/**
* 单个确认发布
* @throws Exception
*/
public static void publishMessageIndividually() throws Exception {
Channel channel = RabbitMqUtils.getChannel();
// 开启发布确认
channel.confirmSelect();
// 随机生成一个队列名
String queueName = UUID.randomUUID().toString();
// 队列申明
channel.queueDeclare(queueName, false, false, false, null);
long begin = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("", queueName, null, message.getBytes());
//服务端返回 false 或超时时间内未返回,生产者可以消息重发
boolean flag = channel.waitForConfirms();
if(flag){
System.out.println("消息发送成功");
}
}
long end = System.currentTimeMillis();
System.out.println("发布" + MESSAGE_COUNT + "个单独确认消息,耗时" + (end - begin) + "ms");
}
缺点:
发布速度特别的慢,因为如果没有确认发布的消息就会 阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条发布消息的吞吐量。
# 批量确认发布
上一种方式相当于一个消息需要确认了,再发布下一个。批量确认发布
则是先发布一批消息然后一起确认可以极大地提高吞吐量。
/**
* 批量确认
* @throws Exception
*/
public static void publishMessageBatch() throws Exception {
Channel channel = RabbitMqUtils.getChannel();
// 开启发布确认
channel.confirmSelect();
// 随机生成一个队列名
String queueName = UUID.randomUUID().toString();
// 队列申明
channel.queueDeclare(queueName, false, false, false, null);
// 批量确认消息大小
int batchSize = 100;
// 未确认消息个数
int outstandingMessageCount = 0;
long begin = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("", queueName, null, message.getBytes());
outstandingMessageCount++;
if(outstandingMessageCount == batchSize) {
// 等待确认
channel.waitForConfirms();
outstandingMessageCount = 0;
}
}
// 为了确保还有剩余没有确认消息 再次确认
if (outstandingMessageCount > 0) {
channel.waitForConfirms();
}
long end = System.currentTimeMillis();
System.out.println("发布" + MESSAGE_COUNT + "个批量确认消息,耗时" + (end - begin) + "ms");
}
当然这种方式的缺点就是:当发生故障导致发布出现问题时,不知道是哪个消息出现 问题了。
# 异步确认发布
异步确认虽然编程逻辑比上两个要复杂,但是性价比最高,无论是可靠性还是效率都没得说, 他是利用回调函数来达到消息可靠性传递的,这个中间件也是通过函数回调来保证是否投递成功。
/**
* 异步确认
*
* @throws Exception
*/
public static void publishMessageAsync() throws Exception {
Channel channel = RabbitMqUtils.getChannel();
// 开启发布确认
channel.confirmSelect();
// 随机生成一个队列名
String queueName = UUID.randomUUID().toString();
// 队列申明
channel.queueDeclare(queueName, false, false, false, null);
/**
* 线程安全有序的一个哈希表,适用于高并发的情况
* 1.轻松的将序号与消息进行关联
* 2.轻松批量删除条目 只要给到序列号
* 3.支持并发访问
*/
ConcurrentSkipListMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
/**
* 确认收到消息的一个回调
* 1.消息序列号
* 2.true 可以确认小于等于当前序列号的消息
* false 确认当前序列号消息
*/
ConfirmCallback ackCallback = (sequenceNumber, multiple) -> {
if (multiple) {
// 返回的是小于等于当前序列号的未确认消息是一个map
ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(sequenceNumber, true);
// 清除该部分未确认消息
confirmed.clear();
} else {
// 只清除当前序列号的消息
outstandingConfirms.remove(sequenceNumber);
}
};
ConfirmCallback nackCallback = (sequenceNumber, multiple) -> {
String message = outstandingConfirms.get(sequenceNumber);
System.out.println("发布的消息" + message + "未被确认,序列号" + sequenceNumber);
};
/**
* 添加一个异步确认的监听器
* 1.确认收到消息的回调
* 2.未收到消息的回调
*/
channel.addConfirmListener(ackCallback, nackCallback);
long begin = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = "消息" + i;
/**
* channel.getNextPublishSeqNo()获取下一个消息的序列号
* 通过序列号与消息体进行一个关联
* 全部都是未确认的消息体
*/
outstandingConfirms.put(channel.getNextPublishSeqNo(), message);
channel.basicPublish("", queueName, null, message.getBytes());
}
long end = System.currentTimeMillis();
System.out.println("发布" + MESSAGE_COUNT + "个异步确认消息,耗时" + (end - begin) + "ms");
}
测试:
public static void main(String[] args) throws Exception {
publishMessageIndividually();
publishMessageBatch();
publishMessageAsync();
}
# 三种发布确认对比
名称 | 优点 | 缺点 |
---|---|---|
单独发布消息 | 同步等待确认,简单 | 吞吐量非常有限 |
批量发布消息 | 批量同步等待确认,简单,合理的吞吐量 | 一旦出现问题但很难推断出是那条 消息出现了问题。 |
异步处理 | 最佳性能和资源使用,在出现错误的情况下可以很好地控制, | 实现起来较为复杂 |
# 发布订阅
# 交换器
在之前的案例中,我们一般只创建生产者
和消费者
,现在我们将引入交换器
。
生产者不直接发送任何消息给队列。实际上,一般的情况下,生产者甚至不知道消息应该发送到哪些队列。 相反的,生产者只能将信息发送到交换器。交换器是非常简单的。它一边收到来自生产者的消息,另一边将它们推送到队列。交换器必须准确知道接收到的消息如何处理。
交换器一共有四种类型:
- direct
- topic
- headers
- fanout
/**
* Actively declare a non-autodelete, non-durable exchange with no extra arguments
* @see com.rabbitmq.client.AMQP.Exchange.Declare
* @see com.rabbitmq.client.AMQP.Exchange.DeclareOk
* @param exchange the name of the exchange
* @param type the exchange type
* @return a declaration-confirm method to indicate the exchange was successfully declared
* @throws java.io.IOException if an error is encountered
*/
Exchange.DeclareOk exchangeDeclare(String exchange, String type) throws IOException;
fanout 类型的交换器非常简单,它只是将所有收到的消息广播到所有它所知道的队列。
之前的案例中我们有这样的代码:
channel.basicPublish("", queueName, null, message.getBytes());
而第一个参数就是交换器的名称,第二个参数表示消息由路由键决定发送到哪个队列。
# 临时队列
之前,我们使用的队列都指定了一个特定的名称。但有时候我们需要这个连接一旦断掉,我们就不需要这个队列。当需要再连接的时候,重新创建一个队列为其服务,这时候就需要临时队列
。
一旦消费者与 RabbitMQ
断开,消费者所接收的那个队列应该被自动删除。 在 Java 客户端中,我们可以使用 queueDeclare()
方法来创建一个非持久的、唯一的、自动删除的队列,且队列名称由服务器随机产生。
String queueName = channel.queueDeclare().getQueue();
# 绑定
当我们的交换器和队列都创建好的时候,这时候就需要去绑定
,告诉交换器发送消息到哪个队列。
# 实战
发送端:
public class EmitLog {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception {
// 创建一个通道
Channel channel = RabbitMqUtils.getChannel();
// 指定交换器类型
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// 发送消息
String msg = "发布订阅消息测试";
channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());
System.out.println("发送的消息是:" + msg);
}
}
接收端:
public class ReceiveLogs {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception {
// 创建一个通道
Channel channel = RabbitMqUtils.getChannel();
// 指定交换器类型
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// 创建一个非持久的、唯一的、自动删除的队列
String queueName = channel.queueDeclare().getQueue();
// 绑定交换器和队列
channel.queueBind(queueName, EXCHANGE_NAME, "");
// 推送的消息如何进行消费的接口回调
DeliverCallback deliverCallback=(consumerTag, delivery)->{
String message= new String(delivery.getBody());
System.out.println(message);
};
// 取消消费的一个回调接口 如在消费的时候队列被删除掉了
CancelCallback cancelCallback=(consumerTag)->{
System.out.println("消息消费被中断");
};
channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
}
}
# 路由
交换器类型设置为direct
。
# 直接交换
第一个队列与绑定键 orange 绑定,第二个队列有两个绑定,一个绑定键为 black,另一个绑定为 green。在这样的设置中,具有 orange 的交换器的消息将被路由到队列 Q1。具有 black 或 green 的交换器的消息将转到 Q2。所有其他消息将被丢弃。
# 多重绑定
使用相同的绑定键绑定多个队列是完全合法的。在我们的示例中,我们可以在 X 和 Q1 之间添加绑定键 black。在这种情况下,direct 类型的交换器将消息广播到所有匹配的队列 Q1 和 Q2。
# 案例
发送端:
public class EmitLogDirect {
private static final String EXCHANGE_NAME = "direct_logs";
private static final String[] LOG_LEVEL_ARR = {"debug", "info", "error"};
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
// 发送消息
for (int i = 0; i < 10; i++) {
int rand = new Random().nextInt(3);
// 随机选择一个日志类型
String severity = LOG_LEVEL_ARR[rand];
String message = "log : [" + severity + "]" + UUID.randomUUID().toString();
// 发布消息至交换器
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
System.out.println(" Sent '" + message + "'");
}
}
}
接收端:
public class ReceiveLogsDirect {
private static final String EXCHANGE_NAME = "direct_logs";
private static final String[] LOG_LEVEL_ARR = {"debug", "info", "error"};
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
// 指定一个交换器
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
// 设置日志级别
int rand = new Random().nextInt(3);
String severity = LOG_LEVEL_ARR[rand];
// 创建一个非持久的、唯一的、自动删除的队列
String queueName = channel.queueDeclare().getQueue();
// 绑定交换器和队列
channel.queueBind(queueName, EXCHANGE_NAME, severity);
// 推送的消息如何进行消费的接口回调
DeliverCallback deliverCallback=(consumerTag, delivery)->{
String message= new String(delivery.getBody());
System.out.println("Received '" + message + "'");
};
// 取消消费的一个回调接口 如在消费的时候队列被删除掉了
CancelCallback cancelCallback=(consumerTag)->{
System.out.println("消息消费被中断");
};
channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
}
}
info日志:
debug日志:
error日志:
# 主题
交换器类型设置为topic
。
# 主题交换
使用 topic 类型的交换器,不能有任意的绑定键,它必须是由点隔开的一系列的标识符组成。标识符可以是任何东西,但通常它们指定与消息相关联的一些功能。其实就是一种在路由交换
的基础上,增加了匹配机制。
# 案例
发送端:
public class EmitLogTopic {
private static final String EXCHANGE_NAME = "topic_logs";
private static final String[] LOG_LEVEL_ARR = {"dao.debug", "dao.info", "dao.error",
"service.debug", "service.info", "service.error",
"controller.debug", "controller.info", "controller.error"};
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
// 指定交换器类型
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
// 发送消息
for (String severity : LOG_LEVEL_ARR) {
String message = "log : [" +severity+ "]" + UUID.randomUUID().toString();
// 发布消息至交换器
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
System.out.println("Sent '" + message + "'");
}
}
}
打印内容:
Sent 'log : [dao.debug]38ac39aa-c1f4-42dc-ae01-32ea30b432d6'
Sent 'log : [dao.info]40108c68-ff94-4277-9ac0-457b3b734b70'
Sent 'log : [dao.error]36fddf24-4921-4c20-85c0-cee86a9aa836'
Sent 'log : [service.debug]effbf423-f8be-45d9-b4a0-d5135c8685e9'
Sent 'log : [service.info]8fd8ea10-3fbe-42e7-bcf8-4d6e83009a1c'
Sent 'log : [service.error]d6a6b40a-7831-4ef3-9821-cce146a24369'
Sent 'log : [controller.debug]17657f2b-ffd3-4732-a082-02dbccd370af'
Sent 'log : [controller.info]94943015-0e70-4a29-a9f5-f7de47471b42'
Sent 'log : [controller.error]8056b306-8b22-4ed4-9969-1b49c48e4bb6'
接收端:
public class ReceiveLogsTopic {
private static final String EXCHANGE_NAME = "topic_logs";
private static final String[] LOG_LEVEL_ARR = {"#", "dao.error", "*.error", "dao.*", "service.#", "*.controller.#"};
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
// 指定一个交换器
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
// 设置日志级别
int rand = new Random().nextInt(5);
String severity = LOG_LEVEL_ARR[rand];
// 创建一个非持久的、唯一的、自动删除的队列
String queueName = channel.queueDeclare().getQueue();
// 绑定交换器和队列
channel.queueBind(queueName, EXCHANGE_NAME, severity);
// 推送的消息如何进行消费的接口回调
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody());
System.out.println("Received '" + message + "'");
};
// 取消消费的一个回调接口 如在消费的时候队列被删除掉了
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("消息消费被中断");
};
// 打印
System.out.println(" [*] LOG INFO : " + severity);
channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
}
}
打印内容:
[*] LOG INFO : dao.error
Received 'log : [dao.error]36fddf24-4921-4c20-85c0-cee86a9aa836'
[*] LOG INFO : dao.*
Received 'log : [dao.debug]38ac39aa-c1f4-42dc-ae01-32ea30b432d6'
Received 'log : [dao.info]40108c68-ff94-4277-9ac0-457b3b734b70'
Received 'log : [dao.error]36fddf24-4921-4c20-85c0-cee86a9aa836'
[*] LOG INFO : dao.error
Received 'log : [dao.error]36fddf24-4921-4c20-85c0-cee86a9aa836'
即使发送端发送了controller类型的日志,但是因为接收端没有匹配controller,所以接受不到。
# 参考
超详细的RabbitMQ入门,看这篇就够了! (opens new window)