(二) RabbitMQ快速入门, 一篇文章教会你使用RabbitMQ并理解消息发布模型(SpringBoot版)

语言: CN / TW / HK

前言

        RabbitMQ安装完之后呢,该学习怎么使用了。
在这里插入图片描述
        RabbitMQ基于生产者于消费者模型,实现了系统间的解耦
        生产者需要与rabbitmq server建立连接,每一个生产者对应一个虚拟主机,类似于MySQL中库的概念,即一个应用(一个业务)对应一个虚拟主机,使各个应用(业务)之间互不影响。每一个虚拟主机都要跟一个用户进行绑定,这个用户名密码作为虚拟主机的访问权限,所以在开发之前我们需要在web管理界面创建虚拟主机与用户,并将二者绑定,将用户授权。
消费者也需要与rabbitmq server建立连接,从queue(消息队列)中消费消息的前提是连接虚拟主机,有用户名和密码才能成功消费消息。该用户名密码即与虚拟主机绑定的用户名密码。
        消息不一定被生产者放到交换机,由交换机决定放到哪个queue,生产者也可以直接将消息放入queue,这是一种点对点消息发布模型,文章后续会介绍消费模型。




一. 创建用户与虚拟主机

1. 在web界面创建一个虚拟主机,我就取名为rabbitmq了
在这里插入图片描述
        此时该虚拟机的权限为guest,如果操作不失误,按步骤走完,这里会变成你创建的用户名
在这里插入图片描述
2.创建用户,我取名为rabbitmqtest
在这里插入图片描述
3.把创建的虚拟主机与创建的用户绑定
在这里插入图片描述
        然后按照下图给用户分配访问虚拟主机的权限
在这里插入图片描述
到此,已经创建好虚拟主机与用户,且虚拟主机与用户已绑定成功,可以进行开发了。









二. SpringBoot集成RabbitMQ

1. 引入依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2. 编写配置文件(yml格式)

spring:
  application:
    name: spring-boot-rabbitmq
  rabbitmq:
    host: 192.168.132.151
    port: 5672
    username: rabbitmqtest    # 为上述创建的用户名
    password: 123
    virtual-host: /rabbitmq   # 为上述创建的虚拟主机名称

3. 模板对象

        SpringBoot提供了一个模板对象RabbitTemplate,跟RestTemplate、RedisTemplate一样,在SpringBoot启动后便将该对象加载到ioc容器,我们使用的时候自动注入即可。

三. 消息发布模型

1. 直连(点对点)

在这里插入图片描述
        P:生产者,向消息队列发布消息
        红色:消息队列,接收生产者发布的消息
        C:消费者,从消息队列消费消息
1.1 开发生产者



@SpringBootApplication
@RunWith(SpringRunner.class)
public class RabbitMQTest {
   
   
    @Autowired
    private RabbitTemplate rabbitTemplate;
    // 点对点直连
    @Test
    public void test() {
   
   
         // 参数1:消息队列名称  参数2:消息内容
        rabbitTemplate.convertAndSend("hello", "hello rabbitmq");
    }
}

1.2 开发消费者

@Component //需要让当前类被ioc管理
// 代表监听名为hello的消息队列,默认为消息持久化,非独占
@RabbitListener(queuesToDeclare = @Queue("hello")) 
public class HelloConsumer {
   
   
    // 方法名无所谓,随便定义
    //这个注解表明当前方法为消息消费的回调方法,即接收到消息执行此方法
    @RabbitHandler 
    public void receive(String message) {
   
   
        System.out.println("点对点直连消费消息:" + message);
    }
}

        点对点模型会产生的问题:当消费者业务逻辑比较复杂,也就是消费消息过于缓慢,可能产生消息的速度远远大于消费者消费消息的速度,就会导致消息队列的消息大量堆积,消息无法即使处理,如果有多个消费者,每个消费者去处理不同的消息(必须是处理不同消息,否则会出现消息重复处理),效率必然会大大提高。

2. work 工作模型

        为解决点对点模型的问题,引入了工作队列模型(任务队列),工作模型就是将多个消费者绑定到同一个队列,共同消费队列中的消息,消息一旦被消费,就会消失,确保了不会重复消费消息
在这里插入图片描述
        P:生产者,任务的发布者
        C1:消费者1,监听红色队列,消费消息,假设业务逻辑较简单,则消费速度快
        C2:消费者2,监听红色队列,消费消息,假设业务逻辑教复杂,则消费速度慢
2.1 开发生产者




@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testWork() {
   
   
    // 利用循环一次多发几条消息,让多个消费者都有消息可消费
    for (int i = 0; i < 10; i++) {
   
   
        // 参数1:消息队列名称    参数2:消息内容
        rabbitTemplate.convertAndSend("work", "work rabbitmq");
    }
}

2.2 开发消费者

@Component
public class WorkConsumer {
   
   
    // 构建消费者1
    // @RabbitListener该注解加在方法上,直接让方法监听队列,消费消息直接运行该方法
    @RabbitListener(queuesToDeclare = @Queue("work"))
    public void receiveOne(String message) {
   
   
        System.out.println("消费者1:" + message);
    }
    // 构建消费者2
    @RabbitListener(queuesToDeclare = @Queue("work"))
    public void receiveTwo(String message) {
   
   
        System.out.println("消费者2:" + message);
    }
}

2.3 默认消费机制
        默认情况是平均分配,将每一个消息发给下一个消费者,官方明确指出这种方式叫做循环消费。这种消费方式依赖于消息自动确认机制。
2.4 消息确认机制
        消息确认机制有自动确认与手动确认,自动确认指的是在消费者接收到消息之后,队列将立马删除当前消息,队列不会管消费者真正的业务逻辑有没有处理完,立马将下一条消息分配给下一个消费者,这也是平均分配的原理。手动确认即在消费者业务逻辑执行完成后进行确认,告诉消息队列我执行完了,此时消息队列再将消息删除
2.5 默认消费机制存在问题
        举个例子,假如一个消费者在消息队列确认接收了5条消息,此时消息队列已经将这5条消息删除,但是该消费者消费消息较慢,在消费第3条消息的时候宕机了,不仅第三条消息会丢失,剩下的两条消息也会丢失。在我们真正的业务中,并不希望有消息丢失。
2.6 能者多劳
        消费者与消息队列之间是通过channel通道传输消息的,上述提到自动确认机制只是消费者接收到消息rabbitmq就认为你已经处理完成,如果消费者处理消息较慢,那一定会产生消息堆积,堆积的消息就会放到通道。能者多劳让处理消息较快的消费者多处理消息,首先需要将通道设置只允许传输一条消息,这样不会有消息堆积,第二需要开启手动确认机制,消费完消息再通知消费者删除当前消息,然后消费下一条,这样不仅能达到能者多劳,还能避免上述消息丢失的问题。






3. 发布订阅(fanout广播)

在这里插入图片描述
        由官方文档提供的图示可知,该模型可以有多个消费者,每个消费者都有自己的queue(临时消息队列),每个消息队列都要绑定到Exchange(交换机),生产者发送消息只能发送给交换机,由交换机决定发送给哪个队列,该模型下,交换机的类型为fanout,即广播模式,也就是将消息发给所有队列,队列所属的消费者都能拿到消息,实现一处通知,处处执行的广播效果。
3.1 开发生产者

@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testFanout() {
   
   
    // 参数1为交换机名称;参数2为routingKey,路由模式才有用;参数3为消息体
    rabbitTemplate.convertAndSend("orders", "", "广播fanout模型发送的消息");
}

3.2 开发消费者

@Component
public class FanoutConsumer {
   
   
    // 消费者1
    @RabbitListener(bindings = {
   
   
            @QueueBinding(
                    value = @Queue, // 不给值代表创建临时队列
                    exchange = @Exchange(value = "orders", type = "fanout") // 绑定交换机,类型为广播类型
            )
    })
    public void receiveOne(String message) {
   
   
        System.out.println("消费者1:" + message);
    }
    // 消费者2
    @RabbitListener(bindings = {
   
   
            @QueueBinding(
                    value = @Queue, // 不给值代表创建临时队列
                    exchange = @Exchange(value = "orders", type = "fanout") // 绑定交换机,类型为广播类型
            )
    })
    public void receiveTwo(String message) {
   
   
        System.out.println("消费者2:" + message);
    }
}

3.3 应用场景
        例如订单模块下单了,然后向消息队列发布一个消息,商品模块、用户模块都要消费该消息,商品模块进行库存更新,用户模块进行积分更新等等…

4. Routing 路由模型-direct

        发布订阅模型中,一条消息会被所有订阅的队列消费,但在实际业务场景中,我们往往有定向的消费,即有些消息希望被一些消费者消费,有些消息希望被另一些消费者消费。
        在该模型下,生产者向交换机发送的消息要携带routing key,消息队列与交换机绑定也要指定routing key,此时交换机的模式为direct模式,交换机不再把所有消息发向所有消息队列,只有消息队列的routing key与消息的routing key一致时才可接收到此消息。
在这里插入图片描述

  • 官网给出的例子为日志相关的,C1消费者只接受error级别的日志,将其存入磁盘,C2消费者接收所有级别的日志,将其输出到控制台
  • P为生产者,向交换机发布消息,发送时会指定一个routing key,用于标记消息
  • X为交换机,接收生产者的消息,然后把消息传给与routing key匹配的消息队列
  • C1为消费者,其所在队列只能接收到routing key为error的消息,假设消费逻辑为写入磁盘
  • C2为消费者,其所在队列能接收到routing key为info、error、waring的消息,假设消费逻辑为打印到控制台
    4.1 开发生产者
@Autowired
private RabbitTemplate rabbitTemplate;

@Test
public void testRoute() {
   
   
    // 参数1:交换机名称;参数2:routingKey,对消息标记;参数3:消息体
    rabbitTemplate.convertAndSend("logs", "info", "发送routingKey为info的消息");
}

4.2 开发消费者

@Component
public class RouteConsumer {
   
   
    // 消费者1
    @RabbitListener(bindings = {
   
   
            @QueueBinding(
                    value = @Queue, //创建临时队列
                    exchange = @Exchange(value = "logs", type = "direct"), // 绑定交换机,direct模式
                    key = {
   
   "error"} // 指定routingKey
            )
    })
    public void receiveOne(String message) {
   
   
        System.out.println("消费者1将error的消息写入磁盘: " + message);
    }
    
    // 消费者2
    @RabbitListener(bindings = {
   
   
            @QueueBinding(
                    value = @Queue, //创建临时队列
                    exchange = @Exchange(value = "logs", type = "direct"), // 绑定交换机,direct模式
                    key = {
   
   "info", "error", "warning"} // 指定routingKey
            )
    })
    public void receiveTwo(String message) {
   
   
        System.out.println("消费者2将info,error,warning的日志输出到控制台: " + message);
    }
}

4.3 测试结果
在这里插入图片描述
4.4 存在问题
        不够灵活,拓展性差,上面说了,消息队列与交换机绑定的时候需要指明routing key,像消费者2那样,有三个roouting key,那么我就要指定三个,如果后续业务发生变化,消费者2要消费其他routing kye的消息,那就需要再添加新的key。


5.Topic 动态路由模型-topic

在这里插入图片描述
        topic模型,其实跟direct一样,只不过在渠道与消息队列绑定的时候,routing key使用通配符,这样一来,只要发布的消息满足通配符,就可以被消费。在这种模型下 routing key一般都是由一个或者多个单词组成,由“.”分割,例如sms.pay

通配符:
	*	匹配一个单词
	#	匹配多个单词
例如:
	sms.*	可以匹配sms.pay  sms.user等
	sms.#	可以匹配sms.pay、sms.pay.user

5.1 开发生产者

@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testTopic() {
   
   
    rabbitTemplate.convertAndSend("topic", "user.save", "user.save 动态路由消息");
}

5.2 开发消费者

@Component
public class TopicConsumer {
   
   
    // 消费者1
    @RabbitListener(bindings = {
   
   
            @QueueBinding(
                    value = @Queue, //不给值则为创建临时队列
                    exchange = @Exchange(type = "topic", name = "topic"),
                    key = {
   
   "user.#"}
            )
    })
    public void receiveOne(String message) {
   
   
        System.out.println("消费者1,负责消费user相关操作:" + message);
    }
    // 消费者2
    @RabbitListener(bindings = {
   
   
            @QueueBinding(
                    value = @Queue, //不给值则为创建临时队列
                    exchange = @Exchange(type = "topic", name = "topic"),
                    key = {
   
   "order.*"}
            )
    })
    public void receiveTwo(String message) {
   
   
        System.out.println("消费者2,负责消费order相关操作:" + message);
    }
}

总结

        到此,RabbitMQ基础的使用方式与常见的消息发布模型就介绍完了,其使用场景无非就是解耦、异步、削峰。以上只是简单示例用法,大家可以在此基础上继续研究,例如如何实现延时发布消息,类似于淘宝7天后自动确认收货,即7天后向支付系统发送mq消息,进行账户转账确认;还有如何实现消息限流等等。另外rabbitmq高可用集群搭建是必不可少的,等后续再给大家介绍。

分享到: