1. RabbitMq概述
1.1 MQ简介
异步处理
消息发送的时间取决于业务执行的最长的时间

应用解耦
原本是需要订单系统直接调用库存系统
只需要将请求发送给消息队列,其他的就不需要去处理了,节省了处理业务逻辑的时间

流量消峰
某一时刻如果请求特别的大,那就先把它放入消息队列,从而达到流量消峰的作用

1.2 RabbitMq简介
大多应用中,可通过消息服务中间件来提升系统异步通信,扩展解耦能力
消息服务中两个重要概念:
- 消息代理(message broker)** 和 目的地(destination)
当消息发送者发送消息后,将由消息代理接管,消息代理保证消息传递到指定目的地
消息队列主要有两种形式的目的地
点对点式:
发布订阅式:
- 发送者(发布者)发到消息到主题,多个接收者(订阅者)监听(订阅)这个主题,那么就会在消息到达时同时收到消息
JMS(Java Message Service) Java消息服务:
- 基于JVM消息代理的规范,ActiveMQ、HornetMQ是JMS的实现
AMQP(Advanced Message Queuing Protocol)
Spring 支持
spring - jms提供了对JMS的支持
spring - rabbit提供了对AMQP的支持
需要ConnectionFactory的实现来连接消息代理
提供 JmsTemplate、RabbitTemplate 来发送消息
@JmsListener(JMS)、@RabbitListener(AMQP)注解在方法上监听消息代理发布的消息
@EnableJms、@EnableRabbit开启支持
Spring Boot 自动配置
JmsAutoConfiguration
RabbitAutoConfiguration
市面上的MQ产品
- ActiveMQ、RabbitMQ、RocketMQ,kafka
|
JMS(Java Message Service) |
AMQP(Advanced Message Queuing Protocol) |
| 定义 |
Java api |
网络线级协议 |
| 跨语言 |
否 |
是 |
| 跨平台 |
否 |
是 |
| Model |
提供两种消息模型: (1)、Peer-2-Peer (2)、Pub/sub |
提供两种消息模型: (1)、direct exchage: (2)、fanout exchage: (3)、topic exchage: (4)、headers exchage: (5)、system exchage: 本质来讲,后四种和JMS的pub/sub模型没有太大差别,仅是在路由机制上做了更详细的划分 |
| 支持消息类型 |
多种消息类型: TextMessage MapMessage BytesMessage StreamMessage ObjectMessage Message (只有消息头和属性) |
byte[] 当实际应用时,由复杂的消息,可以将消息序列化后发送。 |
| 综合评价 |
JMS定义了Java Api层面的标准;在Java体系中,多个client均可以通过JMS进行交互,不需要应用修改代码,但是对其跨平台的支持较差。 |
AMQP定义了wire-level层的协议标准;天然具有跨平台、跨语言特性。 |
1.3 RabbitMq工作流程

RabbitMQ简介:
RabbitMQ是一由erlang开发的AMQP(Advanved Message Queue Protocol)的开源实现
核心概念
Message
消息,消息是不具名的,它是由消息头和消息体组成,消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括 routing - key (路由键),priority(相对于其他消息的优先权),delivery - mode(指出该消息可能需要持久性存储)等
Publisher
消息的生产者,也是一个像交换器发布消息的客户端应用程序
Exchange
交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列
Exchange有4种类型:direct(默认)、fanout、topic,和heades,不同类型的 Exchange 转发消息的策略有所区别
Queue
消息队列,用来保存消息直到发送给消费者,他是消息的容器,也是消息的重点,一个消息可以投入一个或多个队列,消息一直在队列里面,等待消费者连接到这个队列将其取走
Binding
绑定,用于消息队列和交换器之间的关联,一个绑定就是基于路由键将交换器和消息队列连接起来的规则,所有可以将交换器理解成一个由绑定构成的路由表
Connection
网路连接,比如一个TCP连接
Channel
信道,多路复用连接中的一个独立的双向数据流通道,信道是建立在真实的TCP连接的内的虚拟连接,AMQP 命令都是通过信息到发送出去的,不管是发布消息,订阅队列还是接收消息,这些动作都是通过队列完成,因为对应操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条TCP连接
Consumer
消息的消费者,表示一个消息队列中取得消息的客户端应用程序
Virtual Host
虚拟主机,表示交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个Virtual host本质上就是一个 mini 版的RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。Virtual host 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ默认的vhost是/。
Broker
表示消息队列服务器实体

1.4 Docker快速安装RabbitMq
1 2 3 4 5 6 7 8 9
| docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management
4369, 25672 (Erlang发现&集群端口) 5672, 5671 (AMQP端口) 15672 (web管理后台端口) 61613, 61614 (STOMP协议端口) 1883, 8883 (MQTT协议端口) # 自动启动 docker update rabbitmq --restart=always
|
1.5 界面基本使用
RabbitMQ在安装好后,可以访问http://ip地址:15672 ;其自带了guest/guest的用户名和密码;如果需要创建自定义用户;那么也可以登录管理界面后,如下操作:



1.5.1. 用户角色

角色说明:
1、 超级管理员(administrator)
可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。
2、 监控者(monitoring)
可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)
3、 策略制定者(policymaker)
可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。
4、 普通管理者(management)
仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。
5、 其他
无法登陆管理控制台,通常就是普通的生产者和消费者。
1.5.2. Virtual Hosts配置
像mysql拥有数据库的概念并且可以指定用户对库和表等操作的权限。RabbitMQ也有类似的权限管理;在RabbitMQ中可以虚拟消息服务器Virtual Host,每个Virtual Hosts相当于一个相对独立的RabbitMQ服务器,每个VirtualHost之间是相互隔离的。exchange、queue、message不能互通。 相当于mysql的db。Virtual Name一般以/开头。
1.5.2.1. 创建Virtual Hosts

1.5.2.2. 设置Virtual Hosts权限


2. RabbitMq工作模式
RabbitMQ提供了6种模式:简单模式,work模式,Publish/Subscribe发布与订阅模式,Routing路由模式,Topics主题模式,RPC远程调用模式(远程调用,不太算MQ;暂不作介绍);
官网对应模式介绍:https://www.rabbitmq.com/getstarted.html

2.1 RabbitMQ入门(Hello World模式)
2.1.1添加依赖
1 2 3 4 5
| <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.6.0</version> </dependency>
|
2.1.2 编写工厂工具类
1
| RabbitMq要通过生产者和消费者通过建立连接和Broker交互,所以第一件事就是建立连接,建立连接,一般情况都是通过工厂模式来完成。所以首先需要创建连接工厂,我这直接写个工具类了
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| public class ConnectionUtil {
public static Connection getConnection() throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); Connection connection = factory.newConnection(); return connection; }
}
|
2.1.3 编写生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| public class Consumer_HelloWorld { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel();
channel.queueDeclare("hello_world",true,false,false,null);
String body = "hello rabbitmq~~~";
channel.basicPublish("","hello_world",null,body.getBytes());
channel.close(); connection.close();
} }
|


2.1.4 编写消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
| public class Consumer_HelloWorld { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); Channel channel = connection.createChannel();
channel.queueDeclare("hello_world",true,false,false,null);
Consumer consumer = new DefaultConsumer(channel){
@Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("consumerTag:"+consumerTag); System.out.println("Exchange:"+envelope.getExchange()); System.out.println("RoutingKey:"+envelope.getRoutingKey()); System.out.println("properties:"+properties); System.out.println("body:"+new String(body)); } }; channel.basicConsume("hello_world",true,consumer);
} }
|

2.1.5 小结
上述的入门案例中中其实使用的是如下的简单模式:

在上图的模型中,有以下概念:
- P:生产者,也就是要发送消息的程序
- C:消费者:消息的接受者,会一直等待消息到来。
- queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。
2.2. Work queues工作队列模式
2.2.1. 模式说明

Work Queues与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。
应用场景:对于 任务过重或任务较多情况使用工作队列可以提高任务处理的速度。
2.2.2. 代码
Work Queues与入门程序的简单模式的代码是几乎一样的;可以完全复制,并复制多一个消费者进行多个消费者同时消费消息的测试。
1)生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| public class Producer_WorkQueues { public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory(); Channel channel = connection.createChannel(); channel.queueDeclare("work_queues",true,false,false,null); for (int i = 1; i <= 10; i++) { String body = i+"hello rabbitmq~~~";
channel.basicPublish("","work_queues",null,body.getBytes()); }
channel.close(); connection.close();
} }
|
2)消费者1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public class Consumer_WorkQueues1 { public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory(); Channel channel = connection.createChannel(); channel.queueDeclare("work_queues",true,false,false,null); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body:"+new String(body)); } }; channel.basicConsume("work_queues",true,consumer); } }
|
3)消费者2
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public class Consumer_WorkQueues2 { public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory(); Channel channel = connection.createChannel(); channel.queueDeclare("work_queues",true,false,false,null); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body:"+new String(body)); } }; channel.basicConsume("work_queues",true,consumer); } }
|
2.2.3. 测试
启动两个消费者,然后再启动生产者发送消息;到IDEA的两个消费者对应的控制台查看是否竞争性的接收到消息


2.2.4. 小结
在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系。
订阅模式类型
订阅模式示例图:

前面2个案例中,只有3个角色:
- P:生产者,也就是要发送消息的程序
- C:消费者:消息的接受者,会一直等待消息到来。
- queue:消息队列,图中红色部分
而在订阅模型中,多了一个exchange角色,而且过程略有变化:
- P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
- C:消费者,消息的接受者,会一直等待消息到来。
- Queue:消息队列,接收消息、缓存消息。
- Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:
- Fanout:广播,将消息交给所有绑定到交换机的队列
- Direct:定向,把消息交给符合指定routing key 的队列
- Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
2.3. Publish/Subscribe发布与订阅模式
2.3.1. 模式说明

发布订阅模式:
1、每个消费者监听自己的队列。
2、生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收
到消息
2.3.2. 代码
1)生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
|
public class Producer_PubSub { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("172.16.98.133"); factory.setPort(5672); factory.setVirtualHost("/"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel();
String exchangeName = "test_fanout"; channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null); String queue1Name = "test_fanout_queue1"; String queue2Name = "test_fanout_queue2"; channel.queueDeclare(queue1Name,true,false,false,null); channel.queueDeclare(queue2Name,true,false,false,null);
channel.queueBind(queue1Name,exchangeName,""); channel.queueBind(queue2Name,exchangeName,"");
String body = "日志信息:张三调用了findAll方法...日志级别:info..."; channel.basicPublish(exchangeName,"",null,body.getBytes());
channel.close(); connection.close();
} }
|
默认会创建两个队列,并且每个队列中都有一条消息

2)消费者1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| public class Consumer_PubSub1 { public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory(); Channel channel = connection.createChannel(); String queue1Name = "test_fanout_queue1"; String queue2Name = "test_fanout_queue2";
Consumer consumer = new DefaultConsumer(channel){
@Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body:"+new String(body)); System.out.println("将日志信息打印到控制台....."); } }; channel.basicConsume(queue1Name,true,consumer); } }
|
3)消费者2
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| public class Consumer_PubSub1 { public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory(); Channel channel = connection.createChannel(); String queue1Name = "test_fanout_queue1"; String queue2Name = "test_fanout_queue2";
Consumer consumer = new DefaultConsumer(channel){
@Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body:"+new String(body)); System.out.println("将日志信息打印到控制台....."); } }; channel.basicConsume(queue1Name,true,consumer); } }
|
2.3.3. 测试
启动所有消费者,然后使用生产者发送消息;在每个消费者对应的控制台可以查看到生产者发送的所有消息;到达广播的效果。


在执行完测试代码后,其实到RabbitMQ的管理后台找到Exchanges选项卡,点击 fanout_exchange 的交换机,可以查看到如下的绑定:

2.3.4. 小结
交换机需要与队列进行绑定,绑定之后;一个消息可以被多个消费者都收到。
发布订阅模式与工作队列模式的区别
1、工作队列模式不用定义交换机,而发布/订阅模式需要定义交换机。
2、发布/订阅模式的生产方是面向交换机发送消息,工作队列模式的生产方是面向队列发送消息(底层使用默认交换机)。
3、发布/订阅模式需要设置队列和交换机的绑定,工作队列模式不需要设置,实际上工作队列模式会将队列绑 定到默认的交换机 。
2.4. Routing路由模式
2.4.1. 模式说明
路由模式特点:
- 队列与交换机的绑定,不能是任意绑定了,而是要指定一个
RoutingKey(路由key)
- 消息的发送方在 向 Exchange发送消息时,也必须指定消息的
RoutingKey。
- Exchange不再把消息交给每一个绑定的队列,而是根据消息的
Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

图解:
- P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。
- X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列
- C1:消费者,其所在队列指定了需要routing key 为 error 的消息
- C2:消费者,其所在队列指定了需要routing key 为 info、error、 的消息
2.4.2. 代码
在编码上与 Publish/Subscribe发布与订阅模式 的区别是交换机的类型为:Direct,还有队列绑定交换机的时候需要指定routing key。
1)生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
| public class Producer_Routing { public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory(); Channel channel = connection.createChannel();
String exchangeName = "test_direct"; channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,false,null); String queue1Name = "test_direct_queue1"; String queue2Name = "test_direct_queue2";
channel.queueDeclare(queue1Name,true,false,false,null); channel.queueDeclare(queue2Name,true,false,false,null);
channel.queueBind(queue1Name,exchangeName,"error"); channel.queueBind(queue2Name,exchangeName,"info"); channel.queueBind(queue2Name,exchangeName,"error"); channel.queueBind(queue2Name,exchangeName,"warning");
String body = "日志信息:张三调用了delete方法...出错误了。。。日志级别:error..."; channel.basicPublish(exchangeName,"error",null,body.getBytes());
channel.close(); connection.close();
} }
|
2)消费者1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| public class Consumer_Routing1 { public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory(); Channel channel = connection.createChannel();
String queue1Name = "test_direct_queue1"; String queue2Name = "test_direct_queue2"; Consumer consumer = new DefaultConsumer(channel){
@Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body:"+new String(body)); System.out.println("将日志信息打印到控制台....."); } }; channel.basicConsume(queue1Name,true,consumer); } }
|
3)消费者2
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| public class Consumer_Routing2 { public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory(); Channel channel = connection.createChannel();
String queue1Name = "test_direct_queue1"; String queue2Name = "test_direct_queue2";
Consumer consumer = new DefaultConsumer(channel){
@Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:"+new String(body)); System.out.println("将日志信息存储到数据库....."); } }; channel.basicConsume(queue2Name,true,consumer); } }
|
2.4.3. 测试
启动所有消费者,然后使用生产者发送消息;在消费者对应的控制台可以查看到生产者发送对应routing key对应队列的消息;到达按照需要接收的效果(当routingKey为error时,两个消费者都会收到消息,当为info或warning时只有第二个会收到消息)。
在执行完测试代码后,其实到RabbitMQ的管理后台找到Exchanges选项卡,点击 direct_exchange 的交换机,可以查看到如下的绑定:

2.4.4. 小结
Routing模式要求队列在绑定交换机时要指定routing key,消息会转发到符合routing key的队列。
2.5. Topics通配符模式
2.5.1. 模式说明
Topic类型与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!
Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
通配符规则:
#:匹配一个或多个词
*:匹配不多不少恰好1个词
举例:
item.#:能够匹配item.insert.abc 或者 item.insert
item.*:只能匹配item.insert

图解:
- 红色Queue:绑定的是
usa.# ,因此凡是以 usa.开头的routing key 都会被匹配到
- 黄色Queue:绑定的是
#.news ,因此凡是以 .news结尾的 routing key 都会被匹配
2.5.2. 代码
1)生产者
使用topic类型的Exchange,发送消息的routing key有3种: item.insert、item.update、item.delete:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| package com.itheima.rabbitmq.topic;
import com.itheima.rabbitmq.util.ConnectionUtil; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection;
public class Producer {
static final String TOPIC_EXCHAGE = "topic_exchange"; static final String TOPIC_QUEUE_1 = "topic_queue_1"; static final String TOPIC_QUEUE_2 = "topic_queue_2";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(TOPIC_EXCHAGE, BuiltinExchangeType.TOPIC);
String message = "新增了商品。Topic模式;routing key 为 item.insert " ; channel.basicPublish(TOPIC_EXCHAGE, "item.insert", null, message.getBytes()); System.out.println("已发送消息:" + message);
message = "修改了商品。Topic模式;routing key 为 item.update" ; channel.basicPublish(TOPIC_EXCHAGE, "item.update", null, message.getBytes()); System.out.println("已发送消息:" + message);
message = "删除了商品。Topic模式;routing key 为 item.delete" ; channel.basicPublish(TOPIC_EXCHAGE, "item.delete", null, message.getBytes()); System.out.println("已发送消息:" + message);
channel.close(); connection.close(); } }
|
2)消费者1
接收两种类型的消息:更新商品和删除商品
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| public class Consumer_Topic1 { public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672);
Connection connection = factory.newConnection(); Channel channel = connection.createChannel();
String queue1Name = "test_topic_queue1"; String queue2Name = "test_topic_queue2";
Consumer consumer = new DefaultConsumer(channel){
@Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body:"+new String(body)); System.out.println("将日志信息存入数据库......."); } }; channel.basicConsume(queue1Name,true,consumer); } }
|
3)消费者2
接收所有类型的消息:新增商品,更新商品和删除商品。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| public class Consumer_Topic1 { public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672);
Connection connection = factory.newConnection(); Channel channel = connection.createChannel();
String queue1Name = "test_topic_queue1"; String queue2Name = "test_topic_queue2";
Consumer consumer = new DefaultConsumer(channel){
@Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body:"+new String(body)); System.out.println("将日志信息存入数据库......."); } }; channel.basicConsume(queue2Name,true,consumer); } }
|
2.5.3. 测试
启动所有消费者,然后使用生产者发送消息;在消费者对应的控制台可以查看到生产者发送对应routing key对应队列的消息;到达按照需要接收的效果;并且这些routing key可以使用通配符。
在执行完测试代码后,其实到RabbitMQ的管理后台找到Exchanges选项卡,点击 topic_exchange 的交换机,可以查看到如下的绑定:

2.5.4. 小结
Topic主题模式可以实现 Publish/Subscribe发布与订阅模式 和 Routing路由模式 的功能;只是Topic在配置routing key 的时候可以使用通配符,显得更加灵活。
2.6. 模式总结
RabbitMQ工作模式:
1、简单模式 HelloWorld
一个生产者、一个消费者,不需要设置交换机(使用默认的交换机)
2、工作队列模式 Work Queue
一个生产者、多个消费者(竞争关系),不需要设置交换机(使用默认的交换机)
3、发布订阅模式 Publish/subscribe
需要设置类型为fanout的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列
4、路由模式 Routing
需要设置类型为direct的交换机,交换机和队列进行绑定,并且指定routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列
5、通配符模式 Topic
需要设置类型为topic的交换机,交换机和队列进行绑定,并且指定通配符方式的routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列
3. Spring 整合RabbitMQ
3.1. 搭建生产者工程
3.1.1. 创建工程


3.1.2. 添加依赖
修改pom.xml文件内容为如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion>
<groupId>com.itheima</groupId> <artifactId>spring-rabbitmq-producer</artifactId> <version>1.0-SNAPSHOT</version>
<dependencies> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>5.1.7.RELEASE</version> </dependency>
<dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>2.1.8.RELEASE</version> </dependency>
<dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency>
<dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <version>5.1.7.RELEASE</version> </dependency> </dependencies>
</project>
|
3.1.3. 配置整合
- 创建
spring-rabbitmq-producer\src\main\resources\properties\rabbitmq.properties连接参数等配置文件;
1 2 3 4 5
| rabbitmq.host=192.168.12.135 rabbitmq.port=5672 rabbitmq.username=heima rabbitmq.password=heima rabbitmq.virtual-host=/itcast
|
- 创建
spring-rabbitmq-producer\src\main\resources\spring\spring-rabbitmq.xml 整合配置文件;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
| <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"> <context:property-placeholder location="classpath:properties/rabbitmq.properties"/>
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}" virtual-host="${rabbitmq.virtual-host}"/> <rabbit:admin connection-factory="connectionFactory"/>
<rabbit:queue id="spring_queue" name="spring_queue" auto-declare="true"/>
<rabbit:queue id="spring_fanout_queue_1" name="spring_fanout_queue_1" auto-declare="true"/>
<rabbit:queue id="spring_fanout_queue_2" name="spring_fanout_queue_2" auto-declare="true"/>
<rabbit:fanout-exchange id="spring_fanout_exchange" name="spring_fanout_exchange" auto-declare="true"> <rabbit:bindings> <rabbit:binding queue="spring_fanout_queue_1"/> <rabbit:binding queue="spring_fanout_queue_2"/> </rabbit:bindings> </rabbit:fanout-exchange>
<rabbit:queue id="spring_topic_queue_star" name="spring_topic_queue_star" auto-declare="true"/> <rabbit:queue id="spring_topic_queue_well" name="spring_topic_queue_well" auto-declare="true"/> <rabbit:queue id="spring_topic_queue_well2" name="spring_topic_queue_well2" auto-declare="true"/>
<rabbit:topic-exchange id="spring_topic_exchange" name="spring_topic_exchange" auto-declare="true"> <rabbit:bindings> <rabbit:binding pattern="heima.*" queue="spring_topic_queue_star"/> <rabbit:binding pattern="heima.#" queue="spring_topic_queue_well"/> <rabbit:binding pattern="itcast.#" queue="spring_topic_queue_well2"/> </rabbit:bindings> </rabbit:topic-exchange>
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/> </beans>
|
3.1.4. 发送消息
创建测试文件 spring-rabbitmq-producer\src\test\java\com\itheima\rabbitmq\ProducerTest.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
| @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = "classpath:spring/spring-rabbitmq.xml") public class ProducerTest {
@Autowired private RabbitTemplate rabbitTemplate;
@Test public void queueTest(){ rabbitTemplate.convertAndSend("spring_queue", "只发队列spring_queue的消息。"); }
@Test public void fanoutTest(){
rabbitTemplate.convertAndSend("spring_fanout_exchange", "", "发送到spring_fanout_exchange交换机的广播消息"); }
@Test public void topicTest(){
rabbitTemplate.convertAndSend("spring_topic_exchange", "heima.bj", "发送到spring_topic_exchange交换机heima.bj的消息"); rabbitTemplate.convertAndSend("spring_topic_exchange", "heima.bj.1", "发送到spring_topic_exchange交换机heima.bj.1的消息"); rabbitTemplate.convertAndSend("spring_topic_exchange", "heima.bj.2", "发送到spring_topic_exchange交换机heima.bj.2的消息"); rabbitTemplate.convertAndSend("spring_topic_exchange", "itcast.cn", "发送到spring_topic_exchange交换机itcast.cn的消息"); } }
|
3.2. 搭建消费者工程
3.2.1. 创建工程


3.2.2. 添加依赖
修改pom.xml文件内容为如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion>
<groupId>com.itheima</groupId> <artifactId>spring-rabbitmq-consumer</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>5.1.7.RELEASE</version> </dependency>
<dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>2.1.8.RELEASE</version> </dependency> </dependencies>
</project>
|
3.2.3. 配置整合
- 创建
spring-rabbitmq-consumer\src\main\resources\properties\rabbitmq.properties连接参数等配置文件;
1 2 3 4 5
| rabbitmq.host=192.168.12.135 rabbitmq.port=5672 rabbitmq.username=heima rabbitmq.password=heima rabbitmq.virtual-host=/itcast
|
- 创建
spring-rabbitmq-consumer\src\main\resources\spring\spring-rabbitmq.xml 整合配置文件;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"> <context:property-placeholder location="classpath:properties/rabbitmq.properties"/>
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}" virtual-host="${rabbitmq.virtual-host}"/>
<bean id="springQueueListener" class="com.itheima.rabbitmq.listener.SpringQueueListener"/> <bean id="fanoutListener1" class="com.itheima.rabbitmq.listener.FanoutListener1"/> <bean id="fanoutListener2" class="com.itheima.rabbitmq.listener.FanoutListener2"/> <bean id="topicListenerStar" class="com.itheima.rabbitmq.listener.TopicListenerStar"/> <bean id="topicListenerWell" class="com.itheima.rabbitmq.listener.TopicListenerWell"/> <bean id="topicListenerWell2" class="com.itheima.rabbitmq.listener.TopicListenerWell2"/>
<rabbit:listener-container connection-factory="connectionFactory" auto-declare="true"> <rabbit:listener ref="springQueueListener" queue-names="spring_queue"/> <rabbit:listener ref="fanoutListener1" queue-names="spring_fanout_queue_1"/> <rabbit:listener ref="fanoutListener2" queue-names="spring_fanout_queue_2"/> <rabbit:listener ref="topicListenerStar" queue-names="spring_topic_queue_star"/> <rabbit:listener ref="topicListenerWell" queue-names="spring_topic_queue_well"/> <rabbit:listener ref="topicListenerWell2" queue-names="spring_topic_queue_well2"/> </rabbit:listener-container> </beans>
|
3.2.4. 消息监听器
1)队列监听器
创建 spring-rabbitmq-consumer\src\main\java\com\itheima\rabbitmq\listener\SpringQueueListener.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public class SpringQueueListener implements MessageListener { public void onMessage(Message message) { try { String msg = new String(message.getBody(), "utf-8");
System.out.printf("接收路由名称为:%s,路由键为:%s,队列名为:%s的消息:%s \n", message.getMessageProperties().getReceivedExchange(), message.getMessageProperties().getReceivedRoutingKey(), message.getMessageProperties().getConsumerQueue(), msg); } catch (Exception e) { e.printStackTrace(); } } }
|
2)广播监听器1
创建 spring-rabbitmq-consumer\src\main\java\com\itheima\rabbitmq\listener\FanoutListener1.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| public class FanoutListener1 implements MessageListener { public void onMessage(Message message) { try { String msg = new String(message.getBody(), "utf-8");
System.out.printf("广播监听器1:接收路由名称为:%s,路由键为:%s,队列名为:%s的消息:%s \n", message.getMessageProperties().getReceivedExchange(), message.getMessageProperties().getReceivedRoutingKey(), message.getMessageProperties().getConsumerQueue(), msg); } catch (Exception e) { e.printStackTrace(); } } }
|
3)广播监听器2
创建 spring-rabbitmq-consumer\src\main\java\com\itheima\rabbitmq\listener\FanoutListener2.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public class FanoutListener2 implements MessageListener { public void onMessage(Message message) { try { String msg = new String(message.getBody(), "utf-8");
System.out.printf("广播监听器2:接收路由名称为:%s,路由键为:%s,队列名为:%s的消息:%s \n", message.getMessageProperties().getReceivedExchange(), message.getMessageProperties().getReceivedRoutingKey(), message.getMessageProperties().getConsumerQueue(), msg); } catch (Exception e) { e.printStackTrace(); } } }
|
4)星号通配符监听器
创建 spring-rabbitmq-consumer\src\main\java\com\itheima\rabbitmq\listener\TopicListenerStar.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| public class TopicListenerStar implements MessageListener { public void onMessage(Message message) { try { String msg = new String(message.getBody(), "utf-8");
System.out.printf("通配符*监听器:接收路由名称为:%s,路由键为:%s,队列名为:%s的消息:%s \n", message.getMessageProperties().getReceivedExchange(), message.getMessageProperties().getReceivedRoutingKey(), message.getMessageProperties().getConsumerQueue(), msg); } catch (Exception e) { e.printStackTrace(); } } }
|
5)井号通配符监听器
创建 spring-rabbitmq-consumer\src\main\java\com\itheima\rabbitmq\listener\TopicListenerWell.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| public class TopicListenerWell implements MessageListener { public void onMessage(Message message) { try { String msg = new String(message.getBody(), "utf-8");
System.out.printf("通配符#监听器:接收路由名称为:%s,路由键为:%s,队列名为:%s的消息:%s \n", message.getMessageProperties().getReceivedExchange(), message.getMessageProperties().getReceivedRoutingKey(), message.getMessageProperties().getConsumerQueue(), msg); } catch (Exception e) { e.printStackTrace(); } } }
|
6)井号通配符监听器2
创建 spring-rabbitmq-consumer\src\main\java\com\itheima\rabbitmq\listener\TopicListenerWell2.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| public class TopicListenerWell2 implements MessageListener { public void onMessage(Message message) { try { String msg = new String(message.getBody(), "utf-8");
System.out.printf("通配符#监听器2:接收路由名称为:%s,路由键为:%s,队列名为:%s的消息:%s \n", message.getMessageProperties().getReceivedExchange(), message.getMessageProperties().getReceivedRoutingKey(), message.getMessageProperties().getConsumerQueue(), msg); } catch (Exception e) { e.printStackTrace(); } } }
|
4. Spring Boot整合RabbitMQ
4.1. 简介
在Spring项目中,可以使用Spring-Rabbit去操作RabbitMQ
https://github.com/spring-projects/spring-amqp
尤其是在spring boot项目中只需要引入对应的amqp启动器依赖即可,方便的使用RabbitTemplate发送消息,使用注解接收消息。
一般在开发过程中:
生产者工程:
application.yml文件配置RabbitMQ相关信息;
在生产者工程中编写配置类,用于创建交换机和队列,并进行绑定
注入RabbitTemplate对象,通过RabbitTemplate对象发送消息到交换机
消费者工程:
application.yml文件配置RabbitMQ相关信息
创建消息处理类,用于接收队列中的消息并进行处理
4.2. 搭建生产者工程
4.2.1. 创建工程
创建生产者工程springboot-rabbitmq-producer


4.2.2. 添加依赖
修改pom.xml文件内容为如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.4.RELEASE</version> </parent> <groupId>com.itheima</groupId> <artifactId>springboot-rabbitmq-producer</artifactId> <version>1.0-SNAPSHOT</version>
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency> </dependencies> </project>
|
4.2.3. 启动类
1 2 3 4 5 6 7 8 9 10 11 12
| package com.itheima.rabbitmq;
import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication public class ProducerApplication { public static void main(String[] args) { SpringApplication.run(ProducerApplication.class); } }
|
4.2.4. 配置RabbitMQ
1)配置文件
创建application.yml,内容如下:
1 2 3 4 5 6 7
| spring: rabbitmq: host: localhost port: 5672 virtual-host: /itcast username: heima password: heima
|
2)绑定交换机和队列
创建RabbitMQ队列与交换机绑定的配置类com.itheima.rabbitmq.config.RabbitMQConfig
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| package com.itheima.rabbitmq.config;
import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class RabbitMQConfig { public static final String ITEM_TOPIC_EXCHANGE = "item_topic_exchange"; public static final String ITEM_QUEUE = "item_queue";
@Bean("itemTopicExchange") public Exchange topicExchange(){ return ExchangeBuilder.topicExchange(ITEM_TOPIC_EXCHANGE).durable(true).build(); }
@Bean("itemQueue") public Queue itemQueue(){ return QueueBuilder.durable(ITEM_QUEUE).build(); }
@Bean public Binding itemQueueExchange(@Qualifier("itemQueue") Queue queue, @Qualifier("itemTopicExchange") Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("item.#").noargs(); }
}
|
4.3. 搭建消费者工程
4.3.1. 创建工程
创建消费者工程springboot-rabbitmq-consumer


4.3.2. 添加依赖
修改pom.xml文件内容为如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.4.RELEASE</version> </parent> <groupId>com.itheima</groupId> <artifactId>springboot-rabbitmq-consumer</artifactId> <version>1.0-SNAPSHOT</version>
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> </dependencies>
</project>
|
4.3.3. 启动类
1 2 3 4 5 6 7 8 9 10 11
| package com.itheima.rabbitmq;
import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication public class ConsumerApplication { public static void main(String[] args) { SpringApplication.run(ConsumerApplication.class); } }
|
4.3.4. 配置RabbitMQ
创建application.yml,内容如下:
1 2 3 4 5 6 7
| spring: rabbitmq: host: localhost port: 5672 virtual-host: /itcast username: heima password: heima
|
4.3.5. 消息监听处理类
编写消息监听器com.itheima.rabbitmq.listener.MyListener
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| package com.itheima.rabbitmq.listener;
import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
@Component public class MyListener {
@RabbitListener(queues = "item_queue") public void myListener1(String message){ System.out.println("消费者接收到的消息为:" + message); } }
|
4.4. 测试
在生产者工程springboot-rabbitmq-producer中创建测试类,发送消息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| package com.itheima.rabbitmq;
import com.itheima.rabbitmq.config.RabbitMQConfig; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class) @SpringBootTest public class RabbitMQTest {
@Autowired private RabbitTemplate rabbitTemplate;
@Test public void test(){ rabbitTemplate.convertAndSend(RabbitMQConfig.ITEM_TOPIC_EXCHANGE, "item.insert", "商品新增,routing key 为item.insert"); rabbitTemplate.convertAndSend(RabbitMQConfig.ITEM_TOPIC_EXCHANGE, "item.update", "商品修改,routing key 为item.update"); rabbitTemplate.convertAndSend(RabbitMQConfig.ITEM_TOPIC_EXCHANGE, "item.delete", "商品删除,routing key 为item.delete"); } }
|
先运行上述测试程序(交换机和队列才能先被声明和绑定),然后启动消费者;在消费者工程springboot-rabbitmq-consumer中控制台查看是否接收到对应消息。
另外;也可以在RabbitMQ的管理控制台中查看到交换机与队列的绑定:
