面试-RabbitMQ
1. 同步和异步调用
1.1 同步
- 优点:
- 时效性较强,可以立即得到结果
- 缺点:
- 耦合度高
- 性能和吞吐能力下降
- 有额外的资源消耗
- 有级联失败问题
1.2 异步
优点:
吞吐量提升:无需等待订阅者处理完成,响应更快速
故障隔离:服务没有直接调用,不存在级联失败问题
调用间没有阻塞,不会造成无效的资源占用
耦合度极低,每个服务都可以灵活插拔,可替换
流量削峰:不管发布事件的流量波动多大,都由Broker接收,订阅者可以按照自己的速度去处理事件
缺点:
- 架构复杂了,业务没有明显的流程线,不好管理
- 需要依赖于Broker的可靠、安全、性能
2. RabbitMQ角色

- publisher:生产者
- consumer:消费者
- exchange:交换机,负责消息路由
- queue:队列,存储消息
- virtualHost:虚拟主机,隔离不同租户的exchange、queue、消息的隔离
3. RabbitMQ的6中模式
- SpringAMQP是基于RabbitMQ封装的一套模板,并且还利用SpringBoot对其实现了自动装配,使用起来非常方便。
- 特点:
- 自动声明队列、交换机及其绑定关系
- 基于注解的监听器模式,异步接收消息
- 封装了RabbitTemplate工具,用于发送消息
3.1 BasicQueue
简单模式的中没有交换机,只有队列。生产者发送消息,消费者接收消息,完全是一对一的关系。

3.2 WorkQueue
让多个消费者绑定到一个队列,共同消费队列中的消息
这些消费者是竞争关系,也就是一条消息只能被其中一个消费者消费。

如果不希望消息被均分给每个消费者,而是根据每个消费者实际能力进行消费,修改配置文件
1
2
3
4
5spring:
rabbitmq:
listener:
simple:
prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息通过增加prefetch配置控制消费者的预取数量。
3.3 发布订阅
有如下几个角色:
- Publisher:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
- Exchange:交换机,图中的exchange。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有以下3种类型:
- Fanout:广播,将消息交给所有绑定到交换机的队列
- Direct:路由定向,把消息交给符合指定routing key 的队列
- Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
- Consumer:消费者,与以前一样,订阅队列,没有变化
- Queue:消息队列也与以前一样,接收消息、缓存消息。

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
3.4 Fanout
- Fanout模式中,一条消息,会被所有订阅的队列都消费。

声明交换机
1
2
3
4@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("itcast.fanout");
}可以有多个队列
1
2
3
4@Bean
public Queue fanoutQueue1(){
return new Queue("fanout.queue1");
}每个队列都要绑定到Exchange(交换机)
1
2
3
4@Bean
public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定
1
2
3
4
5
6
7
8@Test
public void testFanoutExchange() {
// 队列名称
String exchangeName = "itcast.fanout";
// 消息
String message = "hello, everyone!";
rabbitTemplate.convertAndSend(exchangeName, "", message);
}交换机把消息发送给绑定过的所有队列
订阅队列的消费者都能拿到消息
1
2
3
4@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
}
3.5 Direct
- 也称为路由模式

队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
1
2
3
4
5
6
7
8
9
10
11@RabbitListener(bindings = @QueueBinding(value = @Queue(name ="direct.queue1"),exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){
System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){
System.out.println("消费者接收到direct.queue2的消息:【" + msg + "】");
}消息的发送方在向 Exchange发送消息时,也必须指定消息的 RoutingKey。
1
2
3
4
5
6
7@Test
public void testSend2DirectExchange() {
String exchangeName = "itcast.direct";
String message = "hello i am direct exchange blue";
rabbitTemplate.convertAndSend(exchangeName, "blue", message);//只有消费者1能接收到消息
}Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息。(类似于对暗号)。
这里注意一个队列可以绑定多个BindingKey,如果两个队列的BindingKey相同,那么交换机发送的时候,会两者都发。
3.6 Topic

也成为通配符模式
Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!
Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,
- 例如: item.insert
通配符规则:
- #:匹配一个或多个词
- *:匹配不多不少恰好1个词
发送消息:
1
2
3
4
5
6
7
8
9
10
11
12
13/**
* topicExchange
*/
@Test
public void testSendTopicExchange() {
// 交换机名称
String exchangeName = "itcast.topic";
// 消息
String message = "喜报!孙悟空大战哥斯拉,胜!";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
}绑定对列和交换机:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue1"),
exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
key = "china.#"
))
public void listenTopicQueue1(String msg){
System.out.println("消费者接收到topic.queue1的消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue2"),
exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
key = "#.news"
))
public void listenTopicQueue2(String msg){
System.out.println("消费者接收到topic.queue2的消息:【" + msg + "】");
}
4. 消息转换器
Spring会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。
转换的时候使用的JDK序列化,存在一下问题:
- 数据体积过大
- 有安全漏洞
- 可读性差
可以使用JSON方式来做序列化和反序列化
发送方MQConfig中增加:
1
2
3
4@Bean
public MessageConverter jsonMessageConverter(){
return new Jackson2JsonMessageConverter();
}接收方Config中增加
1
2
3
4@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
5. 如何保证消息不丢失
- MQ可以用来异步发送通知
- 在整个的发送流程中:
- 发送方生产消息
- 发送方将消息发送给交换机
- 交换机将消息转发给队列
- 消费方从队列中读取队列
- 消费方消费信息
- 因此在上面流程中每个都过程都会经历消息丢失。

- RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功

5.1 消息失败如何处理
- 可以通过回调方法触发重试
- 记录日志
- 保存到数据库然后定时重发,成功发送后即刻删除表中的数据(专门使用一张表记录失败的,定时扫描然后发送)
5.2 消息持久化
MQ默认是内存存储消息,开启持久化功能可以确保缓存在MQ中的消息不丢失。
因此可以在消息转发的各个阶段将消息持久化
交换机持久化
1
2
3
4
5@Bean
public DirectExchange simpleExchange(){
// 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除
return new DirectExchange("simple.direct", true, false);
}队列持久化
1
2
3
4
5@Bean
public Queue simpleQueue(){
// 使用QueueBuilder构建队列,durable就是持久化的
return QueueBuilder.durable("simple.queue").build();
}消息持久化(SpringAMQP中的的消息默认是持久的,可以通过MessageProperties中的DeliveryMode来指定)
1
2
3
4Message msg = MessageBuilder
.withBody(message.getBytes(StandardCharsets.UTF_8)) // 消息体
.setDeliveryMode(MessageDeliveryMode.PERSISTENT) // 持久化
.build();
5.3 消费者确认
- RabbitMQ支持消费者确认机制,即:消费者处理消息后可以向MQ发送ack回执,MQ收到ack回执后才会删除该消息。而SpringAMQP则允许配置三种确认模式:
- manual:手动ack,需要在业务代码结束后,调用api发送ack。
- auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack
- none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除
- 可以利用Spring的retry机制,在消费者出现异常时利用本地重试,设置重试次数,当次数达到了以后,如果消息依然失败,将消息投递到异常交换机,交由人工处理

6. 消息重复消费
- 产生重复消费的原因:
- 网络抖动
- 消费者挂掉

- 只需要给每个消息设置一个唯一id去标识这个消息,在消费者方处理消息的时候,首先根据id查库看消息是否存在,不存在就进行消费,如果存在表示已经消费过数据,就不在进行消费了。
7. 消息堆积
- 当生产者发送消息的速度超过了消费者处理消息的速度,就会导致队列中的消息堆积,直到队列存储消息达到上限。之后发送的消息就会成为死信,可能会被丢弃,这就是消息堆积问题
- 解决:
- 增加更多消费者,提高消费速度
- 在消费者内开启线程池加快消息处理速度
- 扩大队列容积,提高堆积上限(惰性队列)
7.1 惰性队列
惰性队列的特征如下:
- 接收到消息后直接存入磁盘而非内存
- 消费者要消费消息时才会从磁盘中读取并加载到内存
- 支持数百万条的消息存储
创建惰性队列:
1
2
3
4
5
6
7@Bean
public Queue lazyQueue(){
return QueueBuilder
.durable("lazy.queue") //指定队列名称并持久化
.lazy()
.build();
}接收方接收消息:
1
2
3
4@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "lazy.queue", durable = "true", arguments = @Argument(name = "x-queue-mode",value = "lazy")))
public void lazyQueue (String msg){
log.info("接收到 lazy.queue的延迟消息:{}",msg);
}
8.延迟队列
8.1 死信交换机
- 当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):
- 消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false
- 消息是一个过期消息,超时无人消费
- 要投递的队列消息堆积满了,最早的消息可能成为死信
8.2 延迟队列
进入队列的消息会被延迟消费的队列
场景:
- 下单限时支付
- 限时优惠
- 定时发布
延迟队 = 死信交换机 + TTL
声明死信交换机
1
2
3
4
5
6
7
8@Bean
public Queue simpleQueue(){
return QueueBuilder
.durable("simple.queue") //指定队列名称并持久化
.ttl(10000)
.deadLetterExchange("dl.direct")
.build();
}
8.3 死信交换机
- 如果该队列配置了
dead-letter-exchange属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机称为死信交换机(Dead Letter Exchange,简称DLX)。
8.4 TTL
Time-To-Live。如果一个队列中的消息TTL结束仍未消费,则会变为死信,ttl超时分为两种情况:
- 消息所在的队列设置了存活时间
- 消息本身设置了存活时间
发送方创建消息
1
2
3
4
5
6
7
8
9
10
11Message msg = MessageBuilder
.withBody(message.getBytes(StandardCharsets.UTF_8)) // 消息体
.setDeliveryMode(MessageDeliveryMode.PERSISTENT) // 持久化
.setExpration("1000") //消息存活时间为1000ms
.build();
//消息ID,需要封装到CorrelationData中
CorrelationData correlationData = new CorrelationDaata(UUID.randomUUID().toString
//发送消息
rabbitTemplate.convertAndSend("ttl.direct", "ttl",message, correlationData);
8.5 延迟队列插件
DelayExchange插件
DelayExchange的本质还是官方的三种交换机,只是添加了延迟功能。因此使用时只需要声明一个交换机,交换机的类型可以是任意类型,然后设定delayed属性为true即可。
消息接收方:
1
2
3
4@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "delay.queue", durable = "true"),exchange = @Exchange(name = "delay.direct",delayed = "true"),key = "delay"))
public void listenDelayedQueue (String msg){
log.info("接收到 delay.queue的延迟消息:{}",msg);
}创建消息:
1
2
3
4
5
6
7
8
9
10
11Message msg = MessageBuilder
.withBody(message.getBytes(StandardCharsets.UTF_8)) // 消息体
.setDeliveryMode(MessageDeliveryMode.PERSISTENT) // 持久化
.setHeader("x-dealy", 10000) //设置超时时间
.build();
//消息ID,需要封装到CorrelationData中
CorrelationData correlationData = new CorrelationDaata(UUID.randomUUID().toString
//发送消息
rabbitTemplate.convertAndSend("ttl.direct", "ttl",message, correlationData);
10. 集群
10.1 普通集群
- 特点:
- 会在集群的各个节点间共享部分数据,包括:交换机、队列元信息(其他MQ中存放的是这个队列的地址)。不包含队列中的消息。
- 当访问集群某节点时,如果队列不在该节点,会从数据所在节点传递到当前节点并返回
- 队列所在节点宕机,队列中的消息就会丢失

10.2 镜像集群
- 特点:
- 交换机、队列、队列中的消息会在各个mq的镜像节点之间同步备份。
- 创建队列的节点被称为该队列的主节点,备份到的其它节点叫做该队列的镜像节点。
- 一个队列的主节点可能是另一个队列的镜像节点
- 所有操作都是主节点完成,然后同步给镜像节点
- 主宕机后,镜像节点会替代成新的主
- 但是这样的话,主从之间是会有数据延时

10.3 仲裁队列
- 特点:
- 与镜像队列一样,都是主从模式,支持主从数据同步
- 使用非常简单,没有复杂的配置
- 主从同步基于Raft协议,强一致