0%

RabbitMQ

RabbitMQ

使用docker安装并运行rabbitmq

拉去rabbitmq镜像

开放服务器安全组的5672和15672端口

进入容器内安装rabbitmq管理插件

1
2
3
4
5
6
7
8
9
docker pull rabbitmq


docker run -d -p 5672:5672 -p 15672:15672 --name myrabbitmq rabbitmq

docker exec -it myrabbitmq /bin/bash

#在容器内执行这行命令,安装管理插件
rabbitmq-plugins enable rabbitmq_management

访问15672端口即可进入rabbitmq管理页面

账号:guest

密码:guest

工作队列模式

消息应答

轮流调度

round-robin dispatching

轮流调度用工作队列的一个好处是可以轻松地进行并行工作。如果我们积攒了很多任务没做,这时只需要多加几个工人,可以很轻松地扩大处理规模scale。

默认情况下,RabbitMQ会把消息按顺序传给下一个消费者。平均来看,每个消费者拿到的信息数量都是相同的。这种分发信息的机制被称为轮流调度(轮询,round-robin)

消息确认

message acknowledgement

在我们现在写的这份代码里,RabbitMQ一把信息转发给消费者(工人)就会马上把这个任务在队列里删掉。
而完成一个任务需要一定的时间,那如果一个工人在做某项任务期间突然被打断了,我们就会丢失这个任务信息。不仅如此,我们还会丢掉所有交给这个工人但他还没完成的任务。

如果你不想让信息丢失,我们就要开启RabbitMQ的信息确认功能。消费者在接收到并处理完一个任务后,会给RabbitMQ发一个确认信息(Acknowledgement, ACK),告诉他任务已经完成了,可以删掉了。如果消费者没完成任务就死掉了(例如管道关闭了、连接丢失了、TCP连接断掉了),一段时间后RabbitMQ没收到确认信息ACK,就会知道给他的消息没有被处理,从而把这个消息再放进队列里,并让其他消费者去处理。
默认情况下,RabbitMQ会等30分钟。你也可以用rabbitmq.conf中的参数==consumer_timeout==自定义超时时间。(点此查看官方文档解释)

默认情况下,手动消息确认功能是打开的。我们刚才的代码里==boolean autoAck = true==;把这个功能关掉了。

接下来,我们来测试自动应答功能,autoAck改为:==boolean autoAck = false==;。
结束掉一个在工作途中的worker进程,看一下最终的效果,消息会被重新分配给其他worker。

消息持久化

如果RabbitMQ服务器挂掉了,消息也是会丢失的,除非你将==队列和消息==进行持久化(写入磁盘)。

如何修改让消息持久化?

  1. 修改队列的声明参数

    1
    2
    boolean durable=true
    channel.queueDeclare(QUEUE.name,durable,false,false,null)

    ==如果已经声明并使用了一个队列,那么不可以修改这个队列的参数,只能重新换一个队列名称(生产者和消费者代码中的队列名称都需要修改)==

  2. 需要标记我们的消息是持久化的

    1
    2
    3
    4
    5
    6
    import com.rabbitmq.client.MessageProperties;

    channel.basicPublish("", "QUEUE.name",
    MessageProperties.PERSISTENT_TEXT_PLAIN,//这行配置持久化的路径
    message.getBytes());

    注意,虽然我们标记了消息是需要持久化的,但RabbitMQ接收到消息->持久化到磁盘仍然需要一定时间,这就意味着消息可能在缓存里,依然有丢失的可能。不过对于简单的任务队列这也够用了,如果还需要更强的保证消息不丢失,则需要使用“发布者确认”publisher confirms

公平分配

假设我们有两个工人,按顺序分配任务,如果奇数的任务很重偶数的任务很轻松,就会出现有一个工人累的要死,另一个却很闲的情况。任务量分配不均的原因是:RabbitMQ没有看每个工人完成的工作量(即,收到的ACK数)。

为了解决这个问题,可以使用basicQos(Channel Prefetch Setting)方法,即当工人做完一个任务再给他下一个,不要一次性给多个任务。

1
2
int prefetchCount = 1;
channel.basicQos(prefetchCount);

综合实例代码

工具类RabbitUtils

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package org.Utils;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.File;
import java.io.FileInputStream;
import java.util.Properties;

public class RabbitUtil {
public static Channel getChannel() throws Exception{
//引入配置文件
Properties properties = new Properties();
//new File() 里面填充rabbitmq用户的配置文件
FileInputStream inputStream = new FileInputStream(new File("src/main/resources/rabbit-user.properties"));
properties.load(inputStream);

String host = properties.getProperty("rabbit.host");
String username = properties.getProperty("rabbit.username");
String password = properties.getProperty("rabbit.password");

//连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
//建立通信和连接
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
return channel;

}
}

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class NewTask {
private final static String QUEUE_NAME = "task_queue";

public static void main(String[] argv) throws Exception {
// 建立连接和管
Channel channel = RabbitUtil.getChannel();
// 参数一:声明我们要发送的队列是谁(QUEUE_NAME),其他参数这里先不用关注
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 发送消息,这是通过终端查看结果的方法
String message = new Scanner(System.in).nextLine();
// String message = String.join(" ", argv); // ***主要改了这里***
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
System.out.println(" [x] 发送消息 '" + message + "'");
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
public class Worker {
private final static String QUEUE_NAME = "task_queue";

public static void main(String[] argv) throws Exception {
// 建立连接和管道
Channel channel = RabbitUtil.getChannel();
// 声明从哪个队列接受消息
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
System.out.println(" [*] 等待消息. To exit press CTRL+C");

// 平均分配
channel.basicQos(1);

// 接收到信息回调接口,目的是当接收到一条信息时,进行一些操作,比如可以在控制台里打印出来,以告诉程序员收到了信息。
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] 已收到 '" + message + "'");
try {
doWork(message);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(" [x] 工作任务完成!");
// 使用basicAck方法
/**
* 消息应答的方法
* A.Channel.basicAck(用于肯定确认)
* RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了
* B.Channel.basicNack(用于否定确认)
* C.Channel.basicReject(用于否定确认)
* 与 Channel.basicNack 相比少一个参数 不处理该消息了直接拒绝,可以将其丢弃了
*
* 第一个参数
* 获取发送内容的标签
*
* 第二个参数(见下图)
* multiple 的 true 和 false 代表不同意思
* true 代表批量应答【 channel 上未应答的消息】
* 比如说 channel 上有传送 tag 的消息 5,6,7,8 当前 tag 是 8
* 那么此时 5-8 的这些还未应答的消息都会被确认收到消息应答
* false 同上面相比只会应答 tag=8 的消息 5,6,7 这三个消息依然不会被确认收到消息应答
*
*/
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
};
// 取消接收的回调接口,目的是如在接收消息的时候队列被删除掉了,可以进行一些操作,例如告诉程序员接收被中断了。
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("消息消费被中断");
};

// 手动应答,应答方式见basicAck
// 手动应答的好处是可以批量应答并且减少网络拥堵
boolean autoAck = false;

// 管道接收消息
channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, cancelCallback);
}

// 用于模拟工作任务,输入一个点就停顿一秒
private static void doWork(String task) throws InterruptedException {
for (char ch : task.toCharArray()) {
if (ch == '.') Thread.sleep(1000);
}
}
}

发布确认

生产者将channel设置成 confirm 模式,一旦channel进入 confirm 模式,所有在该channel上面发布的消息都将会被指派一个唯一的 ID(从 1 开始),一旦消息被投递到所有匹配的队列之后,broker 就会发送一个确认ACK给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了。

如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号,此外 broker 也可以设置basic.ack 的 multiple 域,表示到这个序列号之前的所有消息都已经得到了处理。

confirm 模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息。

如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消息, 生产者应用程序同样可以在回调方法中处理该nack消息

开启发布确认

发布确认默认是关闭的,如果需要开启需要调用confirmSelect

1
channel.confirmSelect();

单个确认发布

发布一个消息之后只有它确认被发布,后续的消息才能继续发布

waitforconfirmordie(long)这个方法只有在消息被确认的时候才能返回,如果在指定时间范围内这个消息还没有被确认那么它将抛出异常

缺点:发布速度特别慢,因为如果没有确认发布的消息就会阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条发布消息的吞吐量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public static void publishMessageIndividually() throws Exception {
try (Channel channel = RabbitMqUtils.getChannel()) {
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, false, false, false, null);
//开启发布确认
channel.confirmSelect();
long begin = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("", queueName, null, message.getBytes());

//服务端返回 false 或超时时间内未返回,生产者可以消息重发
boolean flag = channel.waitForConfirms();
if (flag) {
System.out.println("消息发送成功");
}
}
long end = System.currentTimeMillis();
System.out.println("发布" + MESSAGE_COUNT + "个单独确认消息,耗时" + (end - begin) + "ms");
}
}

批量确认发布

与单个等待确认消息相比,先发布一批消息然后一起确认可以极大地提高吞吐量。

**缺点: ** 当发生故障导致发布出现问题时,不知道是哪个消息出现问题了,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息。这种方案仍然是同步的,也一样阻塞消息的发布。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public static void publishMessageBatch() throws Exception {
try (Channel channel = RabbitMqUtils.getChannel()) {
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, false, false, false, null);
//开启发布确认
channel.confirmSelect();
//批量确认消息大小
int batchSize = 100;
//未确认消息个数
int outstandingMessageCount = 0;
long begin = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("", queueName, null, message.getBytes());
outstandingMessageCount++;
if (outstandingMessageCount == batchSize) {
channel.waitForConfirms();
outstandingMessageCount = 0;
}
}
//为了确保还有剩余没有确认消息 再次确认
if (outstandingMessageCount > 0)
{
channel.waitForConfirms();
}
long end = System.currentTimeMillis();
System.out.println("发布" + MESSAGE_COUNT + "个批量确认消息,耗时" + (end - begin) + "ms");
}
}

异步确认

异步确认虽然编程逻辑比上两个要复杂,但是性价比最高,无论是可靠性还是效率都更好。

异步确认是利用回调函数来达到消息可靠性传递的,这个中间件也是通过函数回调来保证是否投递成功。

https://img-blog.csdnimg.cn/a8d540b4fc5b4236ab73fa0dc6479081.png

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public static void publishMessageAsync() throws Exception {
try (Channel channel = RabbitMqUtils.getChannel()) {
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, false, false, false, null); //开启发布确认
channel.confirmSelect();
/**
* 线程安全有序的一个哈希表,适用于高并发的情况
* 1.轻松的将序号与消息进行关联
* 2.轻松批量删除条目 只要给到序列号
* 3.支持并发访问
*/
ConcurrentSkipListMap<Long,String> outstandingConfirms = new ConcurrentSkipListMap<> ();
/**
* 确认收到消息的一个回调
* 1.消息序列号
* 2.true可以确认小于等于当前序列号的消息
* false 确认当前序列号消息
*/
ConfirmCallback ackCallback = (sequenceNumber, multiple) -> {
if (multiple) {
//返回的是小于等于当前序列号的未确认消息 是一个 map
ConcurrentNavigableMap<Long,String> confirmed = outstandingConfirms.headMap (sequenceNumber, true);
//清除该部分未确认消息
confirmed.clear();
} else {
//只清除当前序列号的消息
outstandingConfirms.remove(sequenceNumber);
}
}; ConfirmCallback nackCallback = (sequenceNumber, multiple) -> {
String message = outstandingConfirms.get(sequenceNumber);
System.out.println("发布的消息" + message + "未被确认,序列号" + sequenceNumber);
};
/**
* 添加一个异步确认的监听器
* 1.确认收到消息的回调
* 2.未收到消息的回调
*/
channel.addConfirmListener(ackCallback, null);
long begin = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = "消息" + i;
/**
* channel.getNextPublishSeqNo()获取下一个消息的序列号
* 通过序列号与消息体进行一个关联
* 全部都是未确认的消息体
*/
outstandingConfirms.put(channel.getNextPublishSeqNo(), message);
channel.basicPublish("", queueName, null, message.getBytes());
}
long end = System.currentTimeMillis();
System.out.println("发布" + MESSAGE_COUNT + "个异步确认消息,耗时" + (end - begin) + "ms");
}
}

如何处理异步未确认信息?
最好的解决的解决方案就是把未确认的消息放到一个基于内存的能被发布线程访问的队列,
比如说用 ConcurrentLinkedQueue 这个队列在 confirm callbacks 与发布线程之间进行消息的传递。

发布确认对比

  • 单个确认发布:同步等待确认,简单但是吞吐量非常有限
  • 批量确认发布:批量同步等待确认,简单且有合理的吞吐量,但是出问题不知道是哪个出现了问题
  • 异步发布确认:最佳性能和资源的使用,在出现错误的情况下可以很好的控制,但是实现起来比较困难

发布/订阅模式

  • 工作队列模式:一个消息给一个接收者
  • 发布/订阅模式:一个消息给多个接收者

案例说明

建立一个日志记录系统:

工作队列模式:

  1. 一个程序发送日志消息
  2. 另一个程序接收消息并打印

发布/订阅模式:

  1. 每一个接收程序都会收到日志,那我们就可以让一个接收者把日志持久化到磁盘,另一个接收者把日志打印出来。
  2. 每个发布的日志消息都会被广播给所有接收者。

交换机

RabbitMQ消息的核心思想就是:生产者不把消息直接发送给队列(生产者甚至都不知道消息会被发送到哪个队列),而是把消息发送给交换机,交换机知道要把这个消息发送给哪个/哪些队列或丢弃—->使用==exchange type==来声明(exchange type包括direct,topic,headers,fanout)

https://www.freeimg.cn/i/2024/07/09/668c8d8fe6f32.png

1
2
3
4
5
/**
* logs是这次交换的名称
* fanout:广播,把收到的信息发给所有的接收者
**/
channel.exchangeDeclare("logs", "fanout");

查看交换方式的命令、:

1
sudo  rabbitmqctl list_exchanges

在之前写工作队列时,我们没有指定交换方式,却也发送成功了信息,是因为我们是用了匿名交换 (Nameless exchange),也就是默认交换。==默认类型是direct类型==

channel.basicPublish("", "hello", null, message.getBytes()); 这里的""就是是用了默认交换方式:消息会发送给在routingKey里查到的对应的queue。

由此,我们可以以广播形式发布对应的信息了,即

1
channel.basicPublish("logs", "", null, message.getBytes());

临时队列

我们的日志记录系统需要监听所有的日志消息,而不是只是一小部分。另外,我们只关注现在的消息,而不是过时的消息。因此,我们需要完成两件事:

  1. 任何时候我们连接到Rabbit时,他会给我们全新的空队列,并生成随机队列名。
  2. 断开连接时,队列会自动删除。

我们用以下语句,可以生成一个不持久化的、特有的、自动删除的队列:

1
String queueName = channel.queueDeclare().getQueue();

绑定

https://img-blog.csdnimg.cn/0783565f0d024e389056bb76889454c8.png

我们已经创建了一种扇出fanout交换方式和一个队列,接来我们要让交换机把消息传给队列,这个关系就叫做绑定binding

https://www.freeimg.cn/i/2024/07/09/668c909ee21a0.png

1
channel.queueBind(queueName, "logs", "");

列出所有的绑定

1
rabbitmqctl list_bindings

综合实例代码

和前文代码区别不大,主要定义了logs交换方式

发送者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class EmitLog {
private static final String EXCHANGE_NAME = "logs";

public static void main(String[] args) throws Exception{
Channel channel = RabbitUtil.getChannel();
// 声明交换名称和方式
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");

String message = new Scanner(System.in).nextLine();

channel.basicPublish(EXCHANGE_NAME, "",null,message.getBytes(StandardCharsets.UTF_8));

System.out.println(" [x] 发送信息 '" + message + "'");
channel.close();
channel.getConnection().close();
}
}

接收者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class ReceiveLogs {
private static final String EXCHANGE_NAME = "logs";

public static void main(String[] args) throws Exception {
// 获得一个channel
Channel channel = RabbitUtil.getChannel();
// 声明交换模式
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// 获得队列名称
String queueName = channel.queueDeclare().getQueue();
// 绑定队列和交换机
channel.queueBind(queueName, EXCHANGE_NAME, "");

System.out.println(" [*] 等待信息. To exit press CTRL+C");

// 收到消息的回调接口
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] 收到信息 '" + message + "'");
};
// 取消发送的回调接口
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("消息消费被中断");
};
// 接收信息
channel.basicConsume(queueName,true,deliverCallback, cancelCallback);
}
}

路由模式

direct模式

  • 工作队列模式:一个消息给一个接收者
  • 发布订阅模式:一个消息给多个接收者
  • 路由模式:接收者接收一部分信息

绑定

复习一下上文创建绑定的方式:

1
channel.queueBind(queueName, EXCHANGE_NAME, "");

这里的”” 实际上是路由绑定键routingKey参数。

1
channel.queueBind(queueName, EXCHANGE_NAME, "black");

直接交换

在第三章中的日志记录系统中,我们做一些改进:只把一部分重要的信息写进磁盘,但仍然打印所有的日志信息。

与上文使用fanout模式不同,这里我们使用direct交换模式。这种模式将消息发送给对应的队列,这个队列和交换机的绑定键binding key和这条消息的路由键routing key是匹配的。
https://img-blog.csdnimg.cn/ed42af53bf27444093439dc7b95f5316.png

如果现在有一条消息的路由键routing key是“orange”,那么他会被发给Q1 队列。

多重绑定

https://img-blog.csdnimg.cn/43ff6828ba46401e9fc055804d824726.png

你也可以给交换机和多个队列用同一个键绑定。

日志系统代码

https://img-blog.csdnimg.cn/e95750fde9904f62b182cfec4fd46098.png

对于发送者
创建一个直接交换方式的交换机:

1
channel.exchangeDeclare(EXCHANGE_NAME, "direct");

我们用log的严重程度作为路由键,如 info / warning / error。

1
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());

对于接收者
我们用log的严重程度作为绑定键:

1
2
3
4
5
6
7
String queueName = channel.queueDeclare().getQueue();

String[] severities = new String[]{"log", "warning", "error"};
for(String severity : severities){
channel.queueBind(queueName, EXCHANGE_NAME, severity);
}

合并代码

生产者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class EmitLogDirect {
private static final String EXCHANGE_NAME = "direct_logs";

public static void main(String[] argv) throws Exception {
try (Channel channel = RabbitUtil.getChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//创建多个 bindingKey
Map<String, String> bindingKeyMap = new HashMap<>();
bindingKeyMap.put("info", "普通 info 信息");
bindingKeyMap.put("warning", "警告 warning 信息");
bindingKeyMap.put("error", "错误 error 信息");
//debug 没有消费这接收这个消息 就丢失了
bindingKeyMap.put("debug", "调试 debug 信息");
for (Map.Entry<String, String> bindingKeyEntry :
bindingKeyMap.entrySet()) {
String bindingKey = bindingKeyEntry.getKey();
String message = bindingKeyEntry.getValue();
channel.basicPublish(EXCHANGE_NAME, bindingKey, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println("生产者发出消息:" + message);
}
}
}
}

消费者:

  • 一部分写入磁盘

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    public class ReceiveLogsDirectSaveToDisk {
    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] argv) throws Exception {
    Channel channel = RabbitUtil.getChannel();
    channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
    String queueName = "disk";
    channel.queueDeclare(queueName, false, false, false, null);
    channel.queueBind(queueName, EXCHANGE_NAME, "error");
    System.out.println("等待接收消息........... ");

    // 收到消息的回调接口,将日志写入磁盘
    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    String message = new String(delivery.getBody(), "UTF-8");
    message = "接收绑定键:" + delivery.getEnvelope().getRoutingKey() + ",消息:" + message;
    File file = new File("./rabbitmq_info.txt");
    FileUtils.writeStringToFile(file, message, "UTF-8");
    System.out.println("错误日志已经接收");
    };
    // 取消发送的回调接口
    CancelCallback cancelCallback = (consumerTag) -> {
    System.out.println("消息消费被中断");
    };
    channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
    }
    }

  • 一部分直接打印

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    public class ReceiveLogsDirectPrintOut {
    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] argv) throws Exception {
    Channel channel = RabbitUtil.getChannel();
    channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
    String queueName = "console";
    channel.queueDeclare(queueName, false, false, false, null);
    channel.queueBind(queueName, EXCHANGE_NAME, "info");
    channel.queueBind(queueName, EXCHANGE_NAME, "warning");
    System.out.println("等待接收消息........... ");
    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    String message = new String(delivery.getBody(), "UTF-8");
    System.out.println(" 接收绑定键 :" + delivery.getEnvelope().getRoutingKey() + ", 消息:" + message);
    };
    channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
    }
    }


    再看看rabbitMQ管理系统

    https://img-blog.csdnimg.cn/d02f6900af4343848c6df4fb7935b44a.png

主题模式

  • 工作队列模式:一个消息给一个接收者
  • 发布订阅模式(fanout):一个消息给多个接收者
  • 路由模式(direct):接收者接收一部分信息
  • 主题模式(topics):区分发送主体

之前我们的日志系统实现了根据不同信息传给不同的队列,现在我们需要对信息进一步筛选。例如,在Unix系统中,log可能有info/warn/crit的情况,这些log可能是从auth/cron/kern..传送来的,那么如果我们需要区分发送log的主体,仅接受来自cron的critical errors,就需要用到topic交换方式。

https://img-blog.csdnimg.cn/daaa50b807ed4e2dbc4de850e2c6f496.png

路由键 routing key
发送给topic交换模式的交换机 的消息 不能用随意的routing_key,它的路由键必须是一系列用”.”隔开的词语,例如quick.orange.rabbit / stock.usd.nyse。词语的数量可以随便你,但是总长度不能超过255字节。
绑定键 binding key
绑定键和路由键是同一个格式,消息会被发送给能和它路由键匹配的绑定键线路。没有match的消息就会被丢掉。比如,*.orange.* / ..rabbit / quick.orange.rabbit.#。
星号 ““:代替一个词
井号 “#”:代替零个或多个词
当队列的绑定键都是 #,topic exchange就和fanout exchange是一样的。
当队列的绑定键没有``
``和#时,topic exchange就和direct exchange是一样的。

示例代码

生产者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class EmitLogTopic {
private static final String EXCHANGE_NAME = "topic_logs";

public static void main(String[] args) throws Exception {
// 建立连接
Channel channel = RabbitUtil.getChannel();

// 声明topic交换模式的交换机
channel.exchangeDeclare(EXCHANGE_NAME, "topic");

Map<String, String> bindingKeyMap = new HashMap<>();
bindingKeyMap.put("quick.range.rabbit", "被队列Q1Q2接收到");
bindingKeyMap.put("lazy.orange.elephant", "被队列Q1Q2接收到");
bindingKeyMap.put("quick.orange.fox", "被队列Q1接收到");
bindingKeyMap.put("lazy.brown.fox", "虽然满足两个绑定但只被队列Q2接收一次");
bindingKeyMap.put("lazy.pink.rabbit", "虽然满足两个绑定但只被队列 Q2 接收一次");
bindingKeyMap.put("quick.brown.fox", "不匹配任何绑定不会被任何队列接收到会被丢弃");
bindingKeyMap.put("quick.orange.male.rabbit", "是四个单词不匹配任何绑定会被丢弃");
bindingKeyMap.put("lazy.orange.male.rabbit", "是四个单词但匹配 Q2");

Iterator<Map.Entry<String, String>> iterator =
bindingKeyMap.entrySet().iterator();
while (iterator.hasNext()){
Map.Entry<String, String> next = iterator.next();
String bindingKey = next.getKey();
String message = next.getValue();

channel.basicPublish(EXCHANGE_NAME, bindingKey, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println("生产者发出消息: " + bindingKey + "---> " + message);
}

channel.close();
channel.getConnection().close();
}
}

消费者Q1:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class ReveiveLogsTopicQ1 {
private static final String EXCHANGE_NAME = "topic_logs";

public static void main(String[] args)throws Exception{
// 建立channel
Channel channel = RabbitUtil.getChannel();

// 声明交换
channel.exchangeDeclare(EXCHANGE_NAME, "topic");

// 声明Q1队列与绑定关系
String queueName = "Q1";
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, EXCHANGE_NAME,"*.orange.*");

System.out.println("等待接收消息。。匹配模式为\"*.orange.*\"");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println("接收队列:" + queueName +
" --> 路由键:" + delivery.getEnvelope().getRoutingKey() +
" -- 消息:" + message);
};
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("接收失败。。");
};
channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
}
}

消费者Q2:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class ReveiveLogsTopicQ2 {
private static final String EXCHANGE_NAME = "topic_logs";

public static void main(String[] args) throws Exception {
// 建立channel
Channel channel = RabbitUtil.getChannel();

// 声明交换
channel.exchangeDeclare(EXCHANGE_NAME, "topic");

// 声明Q1队列与绑定关系
String queueName = "Q2";
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, EXCHANGE_NAME, "*.*.rabbit");
channel.queueBind(queueName, EXCHANGE_NAME, "lazy.#");

System.out.println("等待接收消息。。匹配模式为:\"*.*.rabbit\"或\"lazy.#\"");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println("接收队列:" + queueName +
" --> 路由键:" + delivery.getEnvelope().getRoutingKey() +
" -- 消息:" + message);
};
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("接收失败。。");
};
channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
}
}

死信队列

死信:由于某些原因(消息TTL过期、队列达到最大长度、消息被拒绝)导致队列中的消息无法被处理。
RabbitMQ死信队列机制:当消息消费发生异常时,将消息投入死信队列。(例如,用户下单成功但未在指定时间内支付 -> 消息自动失效)

代码模拟死信三种情况

消息TTL过期

代码结构图见上图

生产者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class Producer {
private static final String NORMAL_EXCHANGE = "normal_exchange";

public static void main(String[] args) throws Exception{
// 获取连接
Channel channel = RabbitUtil.getChannel();
// 建立一个direct模式的交换
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
// 设置消息的TTL时间
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();

// 该消息是用作演示队列的个数限制
for (int i = 0; i < 11; i++) {
String message = "info" + i;
channel.basicPublish(NORMAL_EXCHANGE, "zhangsan",properties,message.getBytes(StandardCharsets.UTF_8));
System.out.println("生产者发送信息" + message);
}

channel.close();
channel.getConnection().close();
}
}

消费者:
消费者1,处理正常队列中的信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class Consumer01 {
private final static String NORMAL_EXCHANGE = "normal_exchange";
private final static String DEAD_EXCHANGE = "dead_exchange";

public static void main(String[] args) throws Exception{
// 建立channel
Channel channel = RabbitUtil.getChannel();

// 声明死信和普通交换机,类型为direct
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);

// 声明死信队列
String deadQueue = "dead-queue";
channel.queueDeclare(deadQueue, false, false, false, null);
// 死信队列绑定死信交换机与routingKey
channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");

// 正常队列绑定死信队列信息
Map<String, Object> params = new HashMap<>();
// 正常队列设置死信交换机,key是固定值
params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
// 正常队列设置死信routing-key,key是固定值
params.put("x-dead-letter-routing-key", "lisi");

String normalQueue = "normal_queue";
// 将设置死信的参数params放进正常队列声明中
channel.queueDeclare(normalQueue,false,false, false,params);
channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");

System.out.println("等待接收信息。。。");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println("Consumer01 接收到信息: " + message);
};
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("接收失败");
};

channel.basicConsume(normalQueue, true, deliverCallback, cancelCallback);
}
}

消费者2,处理死信队列中的信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class Consumer02 {
private static final String DEAD_EXCHANGE = "dead_exchange";

public static void main(String[] args) throws Exception {
// 建立channel
Channel channel = RabbitUtil.getChannel();
// 声明交换机
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);

// 死信队列声明及绑定交换机
String deadQueue = "dead-queue";
channel.queueDeclare(deadQueue, false, false, false, null);
channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");

System.out.println("等待接收死信队列信息。。。。");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println("Consumer02 接收到死信队列中的信息: " + message);
};
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("接收失败");
};

channel.basicConsume(deadQueue, true, deliverCallback, cancelCallback);
}
}

结果:
(C1需要启动完先关闭,再打开生产者)

img

此时再打开死信队列,死信队列里的消息被C2消费。

img

img

队列达到最大长度

在消息过期TTL代码中修改两处地方:

1.去掉生产者代码中的TTL语句
2.在C1消费者代码中添加 param.put(“x-max-length”, 6),设置正常队列的长度限制。

在这里插入图片描述

==因为设置队列最多六个消息,这个队列被填充满之后,新的消息就会被丢弃或者转移到死信队列,所以c2消费者只能消费四条消息==

消息被拒

在队列达到最大长度代码的基础上,修改C1消费者代码(生产者和C2消费者不变):

1.改为手动应答,修改DeliverCallback。
2.删除param.put(“x-max-length”, 6)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
if (message.equals("info5")) {
System.out.println("Consumer01 接收到消息" + message + "并拒绝签收该消息");
//requeue 设置为 false 代表拒绝重新入队 该队列如果配置了死信交换机将发送到死信队列中
channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);
} else {
System.out.println("Consumer01 接收到消息" + message);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};

boolean autoAck = false;

channel.basicConsume(normalQueue, autoAck, deliverCallback, cancelCallback);

在这里插入图片描述

第五条消息被拒绝,自动丢弃到死信队列中被消费者2消费

延迟队列

延迟队列:队列中的元素需要在指定时间取出和处理。例如,用户发起订单,十分钟内未支付则自动取消。

当数据量很大时,采取轮询的方式显然是不合理的,会给数据库带来很大压力。

在这里插入图片描述

RabbitMQ中的TTL

TTL,最大存活时间,表明消息或该队列中所有消息的最大存活时间。
有两种方式设置:

1.针对每条信息设置TTL

1
2
3
4
rabbitTemplate.convertAndSend("X", "XC", message, correlationData -> {
correlationData.getMessageProperties().setExpiration(ttlTime);
return correlationData;
})

2.在创建队列时设置队列的x-message-ttl属性

1
2
params.put("x-message-ttl", 5000);
return QueueBuilder.durable(QUEUE_A).withArguments(params).build();

如果设置了队列的 TTL 属性,那么一旦消息过期,就会被队列丢弃(如果配置了死信队列被丢到死信队列中)。
而第二种方式,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的。
如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间;另外,还需要注意的一点是,如果不设置 TTL,表示消 息永远不会过期,如果将 TTL 设置为 0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃。

==延时队列核心 = 死信队列 + TTL:TTL让消息延迟多久后成为死信,消费者一直处理死信队列里的信息就行。==

整合SpringBoot

添加依赖

Springboot版本:2.5/9
JDK:8

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
    <dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>

<!-- rabbitMQ依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.76</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-boot-starter</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>25.1-jre</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>

修改properties

1
2
3
4
spring.rabbitmq.host=你的主机ip
spring.rabbitmq.port=5672
spring.rabbitmq.username=你的rabbit用户名
spring.rabbitmq.password=你的rabbit密码

添加swagger配置类

建立一个config包,SwaggerConfig类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Configuration
@EnableSwagger2
public class SwaggerConfig {

@Bean
public Docket webApiConfig() {
return new Docket(DocumentationType.SWAGGER_2)
.groupName("webApi")
.apiInfo(webApiInfo())
.select()
.build();
}

private ApiInfo webApiInfo() {
return new ApiInfoBuilder()
.title("RabbitMQ 接口文档")
.description("本文档描述了rabbitmq微服务接口定义")
.version("1.0")
.contact(new Contact("cherry", "http://xxxx.github.io/", "xxxx@qq.com"))
.build();
}
}

队列TTL

代码架构图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
@Configuration
public class TtlQueueConfig {
public static final String X_EXCHANGE = "X";
public static final String QUEUE_A = "QA";
public static final String QUEUE_B = "QB";

public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
public static final String DEAD_LETTER_QUEUE = "QD";

// 声明 XExchange
@Bean("xExchange")
public DirectExchange xExchange(){
return new DirectExchange(X_EXCHANGE);
}
// 声明 YExchange
@Bean("yExchange")
public DirectExchange yExchange(){
return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
}

// 声明队列A,ttl为10s,绑定到对应的死信交换机
@Bean("queueA")
public Queue queueA(){
Map<String, Object> args = new HashMap<>(3);
// 声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
// 声明当前队列的死信路由key
args.put("x-dead-letter-routing-key","YD");
// 声明队列的TTL
args.put("x-message-ttl", 10000);

return QueueBuilder.durable(QUEUE_A).withArguments(args).build();
}
// 声明队列A绑定X交换机
@Bean
public Binding queueaBindingX(@Qualifier("queueA") Queue queueA,
@Qualifier("xExchange") DirectExchange xExchange){
return BindingBuilder.bind(queueA).to(xExchange).with("XA");
}

// 声明队列B,ttl为40s,绑定到对应的死信交换机
@Bean("queueB")
public Queue queueB(){
Map<String, Object> args = new HashMap<>(3);
// 声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
// 声明当前队列的死信路由key
args.put("x-dead-letter-routing-key","YD");
// 声明队列的TTL
args.put("x-message-ttl", 40000);

return QueueBuilder.durable(QUEUE_B).withArguments(args).build();
}
// 声明队列B绑定X交换机
@Bean
public Binding queuebBindingX(@Qualifier("queueB") Queue queueB,
@Qualifier("xExchange") DirectExchange xExchange){
return BindingBuilder.bind(queueB).to(xExchange).with("XB");
}

// 声明死信队列QD
@Bean("queueD")
public Queue queueD(){
return new Queue(DEAD_LETTER_QUEUE);
}
// 声明死信队列和Y交换机的绑定关系
@Bean
public Binding deadLetterBindingQAD(@Qualifier("queueD") Queue queueD,
@Qualifier("yExchange") DirectExchange yExchange){
return BindingBuilder.bind(queueD).to(yExchange).with("YD");
}
}

测试

发起一个请求: http://localhost:8080/ttl/sendMsg/HelloCherry~

在这里插入图片描述

延时队列优化

在这里插入图片描述

配置文件代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Configuration
public class MsgTtlQueueConfig {
private static final String Y_DEAD_LETTER_EXCHANGE = "Y";
private static final String QUEUE_C = "QC";

// 声明队列C 死信交换机
@Bean("queueC")
public Queue queueB(){
Map<String, Object> args = new HashMap<>(3);
// 声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
// 声明当前队列的死信路由
args.put("x-dead-letter-routing-key","YD");
// 没有声明TTL属性
return QueueBuilder.durable(QUEUE_C).withArguments(args).build();
}

// 声明队列B 绑定 X 交换机
@Bean
public Binding queueBindingX(@Qualifier("queueC") Queue queueC,
@Qualifier("xExchange")DirectExchange xExchange){
return BindingBuilder.bind(queueC).to(xExchange).with("XC");
}
}

消费生产者代码:

1
2
3
4
5
6
7
8
@GetMapping("sendExpirationMsg/{message}/{ttlTime}")
public void sendMsg(@PathVariable String message, @PathVariable String ttlTime){
rabbitTemplate.convertAndSend("X", "XC", message, correlationData ->{
correlationData.getMessageProperties().setExpiration(ttlTime);
return correlationData;
});
log.info("当前时间:{}, 发送一条时长{}毫秒TTL消息给队列C:{}", new Date(), ttlTime, message);
}

在这里插入图片描述

在最开始的时候,就介绍过如果使用在消息属性上设置 TTL 的方式,消息可能并不会按时“死亡“,因为 RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列。

如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行

发布确认高级

在生产环境中由于一些不明原因,导致 rabbitmq 重启,在 RabbitMQ 重启期间生产者消息投递失败, 导致消息丢失,需要手动处理和恢复。于是,我们开始思考,如何才能进行 RabbitMQ 的消息可靠投递呢? 特 别是在这样比较极端的情况,RabbitMQ 集群不可用的时候,无法投递的消息该如何处理呢?

发布确认SpringBoot版本

确认机制方案

在这里插入图片描述

代码架构图

在这里插入图片描述

配置文件

配置文件中添加

1
spring.rabbitmq.publisher-confirm-type=correlated
  • NONE:禁用发布确认模式,是默认值

  • CORRELATED:发布消息成功到交换器后会触发回调方法

  • SIMPLE:经测试有两种效果,其一效果和CORRELATED值一样会触发回调方法;其二在发布消息成功后使用rabbitTemplate调用waitForConfirmsOrDie方法等待broker节点返回发送结果,根据返回结果来判定下一步的逻辑。waitForConfirmsOrDie方法如果返回false则会关闭channel,则接下来无法发送消息到broker。

添加配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Configuration
public class ConfirmConfig {
public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
public static final String CONFIRM_QUEUE_NAME = "confirm.queue";

// 声明业务Exchange
@Bean("confirmExchange")
public DirectExchange confirmExchange(){
return new DirectExchange(CONFIRM_EXCHANGE_NAME);
}

// 声明确认队列
@Bean("confirmQueue")
public Queue confirmQueue(){
return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
}

// 声明确认队列绑定关系
@Bean
public Binding queueBinding(@Qualifier("confirmQueue") Queue queue,
@Qualifier("confirmExchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("key1");
}
}

消息生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@RestController
@RequestMapping("/confirm")
@Slf4j
public class Producer {
public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private MyCallBack myCallBack;

// 依赖注入rabbitTemplate之后再设置它的回调对象
@PostConstruct
public void init(){
rabbitTemplate.setConfirmCallback(myCallBack);
}
@GetMapping("sendMessage/{message}")
public void sendMessage(@PathVariable String message){
// 指定消息id为1
CorrelationData correlationData1 = new CorrelationData("1");
String routingKey = "key1";

rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME, routingKey, message+routingKey, correlationData1);

CorrelationData correlationData2 = new CorrelationData("2");
routingKey = "key2";
rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME, routingKey, message+routingKey, correlationData2);

log.info("发送消息内容:{}", message);
}
}

回调接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback {
/**
* 交换机不管是否收到消息的一个回调方法
* CorrelationData 消息相关数据
* ack 交换机是否收到消息
* @param correlationData
* @param b
* @param s
*/
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
String id = correlationData!=null? correlationData.getId() : "";
if (b){
log.info("交换机已经收到id为{}的消息",id);
} else {
log.info("交换机还未收到id为{}的消息,由于原因:{}", id, s);
}
}
}

消息消费者

1
2
3
4
5
6
7
8
9
10
@Component
@Slf4j
public class ConfirmConsumer {
public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
@RabbitListener(queues = CONFIRM_QUEUE_NAME)
public void receiveMsg(Message message){
String msg = new String(message.getBody());
log.info("接收到队列confirm.queue消息:{}", msg);
}
}

在这里插入图片描述

回退消息

mandatory参数

在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息。

如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的。
那么如何让无法被路由的消息帮我想办法处理一下? 最起码通知我一声,我好自己处理啊。通过设置 mandatory 参数可以在当消息传递过程中不可达目的地时将消息返回给生产者。
(“转接人工服务”)

消费者生产代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
@Slf4j
@Component
public class MessageProducer implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
@Autowired
private RabbitTemplate rabbitTemplate;

// rabbitTemplate 注入之后就设置该值
@PostConstruct
private void init() {
/**
* true: 交换机无法将消息进行路由时,会将该消息返回给生产者
* false:如果发现消息无法进行路由,则直接丢弃
*/
rabbitTemplate.setMandatory(true);
// 设置回退消息交给谁处理
rabbitTemplate.setReturnsCallback(this);
}

@GetMapping("sendMessage")
public void sendMessage(String message) {
// 让消息绑定一个ID值
CorrelationData correlationData1 = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("confirm.exchange", "key1", message + "key1", correlationData1);
log.info("发送消息id为-{},内容为-{}", correlationData1.getId(), message + "key1");
CorrelationData correlationData2 = new CorrelationData(UUID.randomUUID().toString());
log.info("发送消息id为-{},内容为-{}", correlationData2.getId(), message + "key2");
}


@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String id = correlationData != null ? correlationData.getId() : "";
if (ack) {
log.info("交换机收到消息确认成功, id:{}", id);
} else {
log.error("消息id:{} 未成功投递到交换机,原因是:{} ", id, cause);
}
}

@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
log.info("消息:{}被服务器退回,退回原因:{},交换机是:{},路由key:{}",
returnedMessage.getMessage().getBody(), returnedMessage.getReplyText(),
returnedMessage.getExchange(), returnedMessage.getRoutingKey());
}
}

回调接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
/**
* 交换机不管是否收到消息的一个回调方法
* CorrelationData 消息相关数据
* ack 交换机是否收到消息
* @param correlationData
* @param b
* @param s
*/
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
String id = correlationData!=null? correlationData.getId() : "";
if (b){
log.info("交换机已经收到id为{}的消息",id);
} else {
log.info("交换机还未收到id为{}的消息,由于原因:{}", id, s);
}
}

@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
log.error("消息{}, 被交换机{}退回,退回原因:{},路由key:{} ",
returnedMessage.getMessage().getBody(), returnedMessage.getExchange(),
returnedMessage.getReplyText(), returnedMessage.getRoutingKey());
}
}

消息消费者

1
2
3
4
5
6
7
8
9
10
@Component
@Slf4j
public class ConfirmConsumer {
public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
@RabbitListener(queues = CONFIRM_QUEUE_NAME)
public void receiveMsg(Message message){
String msg = new String(message.getBody());
log.info("接收到队列confirm.queue消息:{}", msg);
}
}

备份交换机

使用mandatory参数和回退消息,我们可以处理无法投递的信息,但具体而言我们应该如何处理这些无法路由的信息呢?我们可以建立一个日志,发出警告然后人工处理他们。但是如果有很多台机器,这样做就会很麻烦。设置mandatory参数还会增加生产者的复杂度(因为需要额外写一些逻辑去处理这些返回的消息)。

之前的文章中,我们用死信队列处理失败的消息,但是不能路由的消息是不能进入到队列的,因此没法用死信队列。

这时我们就可以用备份交换机来处理这个问题。当一个交换机收到不可路由的消息时,他会把消息转发到备份交换机中,由备份交换机来转发和处理,通常备份交换机的交换方式为fanout,即将所有消息投递到与其绑定的所有队列中。

当然,我们也可以建立一个报警队列,用独立的消费者来监测和报警。

代码架构图

在这里插入图片描述

修改配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
@Configuration
public class ConfirmConfig {
public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
public static final String BACKUP_EXCHANGE_NAME = "backup.exchange";
public static final String BACKUP_QUEUE_NAME = "backup.queue";
public static final String WARNING_QUEUE_NAME = "warning.exchange";

// 声明确认队列
@Bean("confirmQueue")
public Queue confirmQueue(){
return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
}

// 声明确认队列绑定关系
@Bean
public Binding queueBinding(@Qualifier("confirmQueue") Queue queue,
@Qualifier("confirmExchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("key1");
}

// 声明备份Exchange
@Bean("backupExchange")
public FanoutExchange backupExchange(){
return new FanoutExchange(BACKUP_EXCHANGE_NAME);
}

// 声明业务Exchange
@Bean("confirmExchange")
public DirectExchange confirmExchange(){
ExchangeBuilder exchangeBuilder = ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME)
.durable(true).withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME); // 设置该交换机的备份交换机
return (DirectExchange) exchangeBuilder.build();
}

// 声明警告队列
@Bean("warningQueue")
public Queue warningQueue(){
return QueueBuilder.durable(WARNING_QUEUE_NAME).build();
}

// 声明报警队列绑定关系
@Bean
public Binding warningBinding(@Qualifier("warningQueue") Queue queue,
@Qualifier("backupExchange") FanoutExchange backupExchange){
return BindingBuilder.bind(queue).to(backupExchange);
}

// 声明备份队列
@Bean("backQueue")
public Queue backQueue(){
return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();
}

// 声明备份队列绑定关系
@Bean
public Binding backupBinding(@Qualifier("backQueue") Queue queue,
@Qualifier("backupExchange") FanoutExchange backupExchange) {
return BindingBuilder.bind(queue).to(backupExchange);
}
}

报警消费者

1
2
3
4
5
6
7
8
9
10
@Component
@Slf4j
public class WarningConsumer {
public static final String WARNING_QUEUE_NAME = "warning.queue";
@RabbitListener(queues = WARNING_QUEUE_NAME)
public void receiveWarningMsg(Message message){
String msg = new String(message.getBody());
log.error("报警发现不可路由消息:{}", msg);
}
}

mandatory参数可以与备份交换机一起使用的时候,如果二者同时开启,消息该何去何从,哪个的优先级更高?

==消息会去备份交换机==

拓展

幂等性

用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用。 举个最简单的例子,那就是支付,用户购买商品后支付,支付扣款成功,但是返回结果的时候网络异常, 此时钱已经扣了,用户再次点击按钮,此时会进行第二次扣款,返回结果成功,用户查询余额发现多扣钱 了,流水记录也变成了两条。在以前的单应用系统中,我们只需要把数据操作放入事务中即可,发生错误 立即回滚,但是再响应客户端的时候也有可能出现网络中断或者异常等等。

消费者在消费 MQ 中的消息时,MQ 已把消息发送给消费者,消费者在给MQ 返回 ack 时网络中断, 故 MQ 未收到确认信息,该条消息会重新发给其他的消费者,或者在网络重连后再次发送给该消费者,但 实际上该消费者已成功消费了该条消息,造成消费者消费了重复的消息。

MQ 消费者的幂等性的解决一般使用全局 ID 或者写个唯一标识比如时间戳 或者 UUID 或者订单消费者消费 MQ 中的消息也可利用 MQ 的该 id 来判断,或者可按自己的规则生成一个全局唯一 id,每次消费消 息时用该 id 先判断该消息是否已消费过。

在海量订单生成的业务高峰期,生产端有可能就会重复发生了消息,这时候消费端就要实现幂等性, 这就意味着我们的消息永远不会被消费多次,即使我们收到了一样的消息。业界主流的幂等性有两种操作:
a. 唯一 ID+指纹码机制,利用数据库主键去重,
b.利用 redis 的原子性去实现。

唯一 ID+指纹码机制
指纹码:我们的一些规则或者时间戳加别的服务给到的唯一信息码,它并不一定是我们系统生成的,基本都是由我们的业务规则拼接而来,但是一定要保证唯一性,然后就利用查询语句进行判断这个 id 是否存 在数据库中,优势就是实现简单就一个拼接,然后查询判断是否重复;劣势就是在高并发时,如果是单个数 据库就会有写入性能瓶颈当然也可以采用分库分表提升性能,但也不是我们最推荐的方式。
Redis原子性
利用 redis 执行 setnx(set if not exists) 命令,天然具有幂等性。从而实现不重复消费

解释一下setnx

1
2
3
4
在 Redis 中,SETNX 命令的作用是 "Set if Not eXists"(如果不存在则设置),用于设置一个键的值,只有当这个键不存在时才会成功。具体来说,SETNX 命令会执行以下操作:

如果键不存在,SETNX 会将键设置为指定的值,并返回 1。
如果键已经存在,SETNX 什么也不做,并返回 0。

优先级队列

在我们系统中有一个订单催付的场景,我们的客户在天猫下的订单,淘宝会及时将订单推送给我们,如果在用户设定的时间内未付款那么就会给用户推送一条短信提醒,很简单的一个功能对吧,但是,tmall 商家对我们来说,肯定是要分大客户和小客户的对吧,比如像苹果,小米这样大商家一年起码能给我们创 造很大的利润,所以理应当然,他们的订单必须得到优先处理,而曾经我们的后端系统是使用 redis 来存放的定时轮询,==大家都知道 redis 只能用 List 做一个简简单单的消息队列,并不能实现一个优先级的场景,==所以订单量大了后采用 RabbitMQ 进行改造和优化,如果发现是大客户的订单给一个相对比较高的优先级, 否则就是默认优先级

如何添加

控制面板添加

在这里插入图片描述

队列代码,添加优先级

1
2
3
Map<String, Object> params = new HashMap();
params.put("x-max-priority", 10);
channel.queueDeclare("hello", true, false, false, params);

消息代码,添加优先级

1
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().priority(5).build();
注意事项

要让队列实现优先级需要做的事情:

  1. 队列需要设置为优先级队列,
  2. 消息需要设置消息的优先级,
  3. 消费者需要等待消息已经发送到队列中才去消费,这样才有机会对消息进行排序

代码

消息生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class Producer {
private static final String QUEUE_NAME="hello";
public static void main(String[] args) throws Exception {
try (Channel channel = RabbitMqUtils.getChannel();) {
//给消息赋予一个 priority 属性
AMQP.BasicProperties properties = new
AMQP.BasicProperties().builder().priority(5).build(); for (int i = 1; i <11; i++){
String message = "info"+i;
if(i==5){
channel.basicPublish("", QUEUE_NAME, properties, message.getBytes());
}else{
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
}
System.out.println("发送消息完成:" + message);
}
}
}
}

消息消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
public class Consumer {
private static final String QUEUE_NAME="hello";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChannel();
//设置队列的最大优先级 最大可以设置到 255 官网推荐 1-10 如果设置太高比较吃内存和 CPU
Map<String, Object> params = new HashMap(); params.put("x-max-priority", 10);
channel.queueDeclare(QUEUE_NAME, true, false, false, params);
System.out.println("消费者启动等待消费.............."); DeliverCallback deliverCallback=(consumerTag, delivery)->{
String receivedMessage = new String(delivery.getBody());System.out.println("接收到消 息:"+receivedMessage);};
channel.basicConsume(QUEUE_NAME,true,deliverCallback,(consumerTag)- >{
System.out.println("消费者无法消费消息时调用,如队列被删除");
});
}

惰性队列

RabbitMQ 从 3.6.0 版本开始引入了惰性队列的概念。惰性队列会尽可能的将消息存入磁盘中,而在消费者消费到相应的消息时才会被加载到内存中,它的一个重要的设计目标是能够支持更长的队列,即支持更多的消息存储。当消费者由于各种各样的原因(比如消费者下线、宕机亦或者是由于维护而关闭等)而致使长时间内不能消费消息造成堆积时,惰性队列就很有必要了。

默认情况下,当生产者将消息发送到 RabbitMQ 的时候,队列中的消息会尽可能的存储在内存之中, 这样可以更加快速的将消息发送给消费者。即使是持久化的消息,在被写入磁盘的同时也会在内存中驻留 一份备份。当 RabbitMQ 需要释放内存的时候,会将内存中的消息换页至磁盘中,这个操作会耗费较长的 时间,也会阻塞队列的操作,进而无法接收新的消息。虽然 RabbitMQ 的开发者们一直在升级相关的算法, 但是效果始终不太理想,尤其是在消息量特别大的时候。

队列具备两种模式:default 和 lazy。默认的为 default 模式,在 3.6.0 之前的版本无需做任何变更。lazy 模式即为惰性队列的模式,可以通过调用 channel.queueDeclare 方法的时候在参数中设置,也可以通过 Policy 的方式设置,如果一个队列同时使用这两种方式设置的话,那么 Policy 的方式具备更高的优先级。 如果要通过声明的方式改变已有队列的模式的话,那么只能先删除队列,然后再重新声明一个新的。
在队列声明的时候可以通过“x-queue-mode”参数来设置队列的模式,==取值为“default”和“lazy”。==下面示 例中演示了一个惰性队列的声明细节:

1
2
3
Map<String, Object> args = new HashMap<String, Object>(); 
args.put("x-queue-mode", "lazy");
channel.queueDeclare("myqueue", false, false, false, args);