面试-RabbitMQ

1. 同步和异步调用

1.1 同步

  • 优点:
    • 时效性较强,可以立即得到结果
  • 缺点:
    • 耦合度高
    • 性能和吞吐能力下降
    • 有额外的资源消耗
    • 有级联失败问题

1.2 异步

  • 优点:

    • 吞吐量提升:无需等待订阅者处理完成,响应更快速

    • 故障隔离:服务没有直接调用,不存在级联失败问题

    • 调用间没有阻塞,不会造成无效的资源占用

    • 耦合度极低,每个服务都可以灵活插拔,可替换

    • 流量削峰:不管发布事件的流量波动多大,都由Broker接收,订阅者可以按照自己的速度去处理事件

  • 缺点:

    • 架构复杂了,业务没有明显的流程线,不好管理
    • 需要依赖于Broker的可靠、安全、性能

2. RabbitMQ角色

image-20240504074954754

  • publisher:生产者
  • consumer:消费者
  • exchange:交换机,负责消息路由
  • queue:队列,存储消息
  • virtualHost:虚拟主机,隔离不同租户的exchange、queue、消息的隔离

3. RabbitMQ的6中模式

  • SpringAMQP是基于RabbitMQ封装的一套模板,并且还利用SpringBoot对其实现了自动装配,使用起来非常方便。
  • 特点:
    • 自动声明队列、交换机及其绑定关系
    • 基于注解的监听器模式,异步接收消息
    • 封装了RabbitTemplate工具,用于发送消息

3.1 BasicQueue

  • 简单模式的中没有交换机,只有队列。生产者发送消息,消费者接收消息,完全是一对一的关系。

    image-20240504083159474

3.2 WorkQueue

  • 多个消费者绑定到一个队列,共同消费队列中的消息

  • 这些消费者是竞争关系,也就是一条消息只能被其中一个消费者消费。

  • image-20240504075738054

  • 如果不希望消息被均分给每个消费者,而是根据每个消费者实际能力进行消费,修改配置文件

    1
    2
    3
    4
    5
    spring:
    rabbitmq:
    listener:
    simple:
    prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

    通过增加prefetch配置控制消费者的预取数量。

3.3 发布订阅

  • 有如下几个角色:

    • Publisher:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
    • Exchange:交换机,图中的exchange。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有以下3种类型:
      • Fanout:广播,将消息交给所有绑定到交换机的队列
      • Direct:路由定向,把消息交给符合指定routing key 的队列
      • Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
    • Consumer:消费者,与以前一样,订阅队列,没有变化
    • Queue:消息队列也与以前一样,接收消息、缓存消息。

    image-20240504083459143

  • Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

3.4 Fanout

  • Fanout模式中,一条消息,会被所有订阅的队列都消费。

image-20240504083917675

  • 声明交换机

    1
    2
    3
    4
    @Bean
    public FanoutExchange fanoutExchange(){
    return new FanoutExchange("itcast.fanout");
    }
  • 可以有多个队列

    1
    2
    3
    4
    @Bean
    public Queue fanoutQueue1(){
    return new Queue("fanout.queue1");
    }
  • 每个队列都要绑定到Exchange(交换机)

    1
    2
    3
    4
    @Bean
    public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
    return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }
  • 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定

    1
    2
    3
    4
    5
    6
    7
    8
    @Test
    public void testFanoutExchange() {
    // 队列名称
    String exchangeName = "itcast.fanout";
    // 消息
    String message = "hello, everyone!";
    rabbitTemplate.convertAndSend(exchangeName, "", message);
    }
  • 交换机把消息发送给绑定过的所有队列

  • 订阅队列的消费者都能拿到消息

    1
    2
    3
    4
    @RabbitListener(queues = "fanout.queue1")
    public void listenFanoutQueue1(String msg) {
    System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
    }

3.5 Direct

  • 也称为路由模式

image-20240504084445243

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    @RabbitListener(bindings = @QueueBinding(value = @Queue(name ="direct.queue1"),exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),key = {"red", "blue"}
    ))
    public void listenDirectQueue1(String msg){
    System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】");
    }

    @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),key = {"red", "yellow"}
    ))
    public void listenDirectQueue2(String msg){
    System.out.println("消费者接收到direct.queue2的消息:【" + msg + "】");
    }
  • 消息的发送方在向 Exchange发送消息时,也必须指定消息的 RoutingKey。

    1
    2
    3
    4
    5
    6
    7
    @Test
    public void testSend2DirectExchange() {
    String exchangeName = "itcast.direct";
    String message = "hello i am direct exchange blue";
    rabbitTemplate.convertAndSend(exchangeName, "blue", message);//只有消费者1能接收到消息
    }

  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息。(类似于对暗号)。

  • 这里注意一个队列可以绑定多个BindingKey,如果两个队列的BindingKey相同,那么交换机发送的时候,会两者都发。

3.6 Topic

image-20240504090126542

  • 也成为通配符模式

  • Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!

  • Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,

    • 例如: item.insert
  • 通配符规则:

    • #:匹配一个或多个词
    • *:匹配不多不少恰好1个词
  • 发送消息:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    /**
    * topicExchange
    */
    @Test
    public void testSendTopicExchange() {
    // 交换机名称
    String exchangeName = "itcast.topic";
    // 消息
    String message = "喜报!孙悟空大战哥斯拉,胜!";
    // 发送消息
    rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
    }

  • 绑定对列和交换机:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    @RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "topic.queue1"),
    exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
    key = "china.#"
    ))
    public void listenTopicQueue1(String msg){
    System.out.println("消费者接收到topic.queue1的消息:【" + msg + "】");
    }

    @RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "topic.queue2"),
    exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
    key = "#.news"
    ))
    public void listenTopicQueue2(String msg){
    System.out.println("消费者接收到topic.queue2的消息:【" + msg + "】");
    }

4. 消息转换器

  • Spring会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。

  • 转换的时候使用的JDK序列化,存在一下问题:

    • 数据体积过大
    • 有安全漏洞
    • 可读性差
  • 可以使用JSON方式来做序列化和反序列化

  • 发送方MQConfig中增加:

    1
    2
    3
    4
    @Bean
    public MessageConverter jsonMessageConverter(){
    return new Jackson2JsonMessageConverter();
    }
  • 接收方Config中增加

    1
    2
    3
    4
    @Bean
    public MessageConverter messageConverter() {
    return new Jackson2JsonMessageConverter();
    }

5. 如何保证消息不丢失

  • MQ可以用来异步发送通知
  • 在整个的发送流程中:
    • 发送方生产消息
    • 发送方将消息发送给交换机
    • 交换机将消息转发给队列
    • 消费方从队列中读取队列
    • 消费方消费信息
  • 因此在上面流程中每个都过程都会经历消息丢失。
  • image-20240504091201001
  • RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功
  • image-20240504091228269

5.1 消息失败如何处理

  • 可以通过回调方法触发重试
  • 记录日志
  • 保存到数据库然后定时重发,成功发送后即刻删除表中的数据(专门使用一张表记录失败的,定时扫描然后发送)

5.2 消息持久化

  • MQ默认是内存存储消息,开启持久化功能可以确保缓存在MQ中的消息不丢失。

  • 因此可以在消息转发的各个阶段将消息持久化

    • 交换机持久化

      1
      2
      3
      4
      5
      @Bean
      public DirectExchange simpleExchange(){
      // 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除
      return new DirectExchange("simple.direct", true, false);
      }
    • 队列持久化

      1
      2
      3
      4
      5
      @Bean
      public Queue simpleQueue(){
      // 使用QueueBuilder构建队列,durable就是持久化的
      return QueueBuilder.durable("simple.queue").build();
      }
    • 消息持久化(SpringAMQP中的的消息默认是持久的,可以通过MessageProperties中的DeliveryMode来指定)

      1
      2
      3
      4
      Message msg = MessageBuilder
      .withBody(message.getBytes(StandardCharsets.UTF_8)) // 消息体
      .setDeliveryMode(MessageDeliveryMode.PERSISTENT) // 持久化
      .build();

5.3 消费者确认

  • RabbitMQ支持消费者确认机制,即:消费者处理消息后可以向MQ发送ack回执,MQ收到ack回执后才会删除该消息。而SpringAMQP则允许配置三种确认模式:
    • manual:手动ack,需要在业务代码结束后,调用api发送ack。
    • auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack
    • none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除
  • 可以利用Spring的retry机制,在消费者出现异常时利用本地重试,设置重试次数,当次数达到了以后,如果消息依然失败,将消息投递到异常交换机,交由人工处理

image-20240504092026512

6. 消息重复消费

  • 产生重复消费的原因:
    • 网络抖动
    • 消费者挂掉
  • image-20240504092144544
  • 只需要给每个消息设置一个唯一id去标识这个消息,在消费者方处理消息的时候,首先根据id查库看消息是否存在,不存在就进行消费,如果存在表示已经消费过数据,就不在进行消费了。

7. 消息堆积

  • 当生产者发送消息的速度超过了消费者处理消息的速度,就会导致队列中的消息堆积,直到队列存储消息达到上限。之后发送的消息就会成为死信,可能会被丢弃,这就是消息堆积问题
  • 解决:
    • 增加更多消费者,提高消费速度
    • 在消费者内开启线程池加快消息处理速度
    • 扩大队列容积,提高堆积上限(惰性队列)

7.1 惰性队列

  • 惰性队列的特征如下:

    • 接收到消息后直接存入磁盘而非内存
    • 消费者要消费消息时才会从磁盘中读取并加载到内存
    • 支持数百万条的消息存储
  • 创建惰性队列:

    1
    2
    3
    4
    5
    6
    7
    @Bean
    public Queue lazyQueue(){
    return QueueBuilder
    .durable("lazy.queue") //指定队列名称并持久化
    .lazy()
    .build();
    }
  • 接收方接收消息:

    1
    2
    3
    4
    @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "lazy.queue", durable = "true", arguments = @Argument(name = "x-queue-mode",value = "lazy")))
    public void lazyQueue (String msg){
    log.info("接收到 lazy.queue的延迟消息:{}",msg);
    }

8.延迟队列

8.1 死信交换机

  • 当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):
    • 消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false
    • 消息是一个过期消息,超时无人消费
    • 要投递的队列消息堆积满了,最早的消息可能成为死信

8.2 延迟队列

  • 进入队列的消息会被延迟消费的队列

  • 场景:

    • 下单限时支付
    • 限时优惠
    • 定时发布
  • 延迟队 = 死信交换机 + TTL

  • 声明死信交换机

    1
    2
    3
    4
    5
    6
    7
    8
    @Bean
    public Queue simpleQueue(){
    return QueueBuilder
    .durable("simple.queue") //指定队列名称并持久化
    .ttl(10000)
    .deadLetterExchange("dl.direct")
    .build();
    }

8.3 死信交换机

  • 如果该队列配置了dead-letter-exchange属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机称为死信交换机(Dead Letter Exchange,简称DLX)。

8.4 TTL

  • Time-To-Live。如果一个队列中的消息TTL结束仍未消费,则会变为死信,ttl超时分为两种情况:

    • 消息所在的队列设置了存活时间
    • 消息本身设置了存活时间
  • 发送方创建消息

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    Message msg = MessageBuilder
    .withBody(message.getBytes(StandardCharsets.UTF_8)) // 消息体
    .setDeliveryMode(MessageDeliveryMode.PERSISTENT) // 持久化
    .setExpration("1000") //消息存活时间为1000ms
    .build();

    //消息ID,需要封装到CorrelationData中
    CorrelationData correlationData = new CorrelationDaata(UUID.randomUUID().toString

    //发送消息
    rabbitTemplate.convertAndSend("ttl.direct", "ttl",message, correlationData);

8.5 延迟队列插件

  • DelayExchange插件

  • https://www.rabbitmq.com/community-plugins.html

  • DelayExchange的本质还是官方的三种交换机,只是添加了延迟功能。因此使用时只需要声明一个交换机,交换机的类型可以是任意类型,然后设定delayed属性为true即可。

  • 消息接收方:

    1
    2
    3
    4
    @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "delay.queue", durable = "true"),exchange = @Exchange(name = "delay.direct",delayed = "true"),key = "delay"))
    public void listenDelayedQueue (String msg){
    log.info("接收到 delay.queue的延迟消息:{}",msg);
    }
  • 创建消息:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    Message msg = MessageBuilder
    .withBody(message.getBytes(StandardCharsets.UTF_8)) // 消息体
    .setDeliveryMode(MessageDeliveryMode.PERSISTENT) // 持久化
    .setHeader("x-dealy", 10000) //设置超时时间
    .build();

    //消息ID,需要封装到CorrelationData中
    CorrelationData correlationData = new CorrelationDaata(UUID.randomUUID().toString

    //发送消息
    rabbitTemplate.convertAndSend("ttl.direct", "ttl",message, correlationData);

10. 集群

10.1 普通集群

  • 特点:
    • 会在集群的各个节点间共享部分数据,包括:交换机、队列元信息(其他MQ中存放的是这个队列的地址)。不包含队列中的消息。
    • 当访问集群某节点时,如果队列不在该节点,会从数据所在节点传递到当前节点并返回
    • 队列所在节点宕机,队列中的消息就会丢失

image-20240504095204856

10.2 镜像集群

  • 特点:
    • 交换机、队列、队列中的消息会在各个mq的镜像节点之间同步备份。
    • 创建队列的节点被称为该队列的主节点,备份到的其它节点叫做该队列的镜像节点。
    • 一个队列的主节点可能是另一个队列的镜像节点
    • 所有操作都是主节点完成,然后同步给镜像节点
    • 主宕机后,镜像节点会替代成新的主
  • 但是这样的话,主从之间是会有数据延时

image-20240504095309382

10.3 仲裁队列

  • 特点:
    • 与镜像队列一样,都是主从模式,支持主从数据同步
    • 使用非常简单,没有复杂的配置
    • 主从同步基于Raft协议,强一致

面试-RabbitMQ
https://baijianglai.cn/面试-RabbitMQ/6a1379042a62/
作者
Lai Baijiang
发布于
2024年5月4日
许可协议