ActiveMQ高级篇-集群与大厂面试题
1. ActiveMQ的多节点集群
面试题:引入消息中间件后如何保证其高可用
是什么:基于zookeeper和LevelDB搭建ActiveMQ集群。集群仅提供主备方式的高可用集群功能,避免单点故障。
1.1 zookeeper+replicated-leveldb-store的主从集群
三种集群方式对比
- 基于shareFileSystem共享文件系统(KahaDB)
- 基于JDBC
- 基于可复制的LevelDB

1 2
| LevelDB,5.6版本之后推出了LecelDB的持久化引擎,它使用了自定义的索引代替常用的BTree索引,其持久化性能高于KahaDB,虽然默认的持久化方式还是KahaDB,但是LevelDB可能会是趋势。 在5.9版本还提供了基于LevelDB和Zookeeper的数据复制方式,作为Master-Slave方式的首选数据复制方案。
|
1.2 本次案例采用ZK+Replicated LevelDB Store
1. ShareFileSystem

2. 是什么
1 2 3 4 5 6
| 从ActiveMQ5.9开始,ActiveMQ的集群实现方式取消了传统的Masster-Slave方式. 增加了基于Zookeeper+LevelDB的Master-Slave实现方式,从5.9版本后也是官网的推荐 基于Zookeeper和LevelDB搭建ActiveMQ集群,集群仅提供主备方式的高可用集群功能,避免单点故障.
http://activemq.apache.org/replicated-leveldb-store
|
3. 官网集群原理图
官网集群原理图:http://activemq.apache.org/replicated-leveldb-store
解释:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| 使用Zookeeper集群注册所有的ActiveMQ Broker但只有其中一个Broker可以提供服务,它将被视为Master,其他的Broker处于待机状态被视为Slave。
如果Master因故障而不能提供服务,Zookeeper会从Slave中选举出一个Broker充当Master。Slave连接Master并同步他们的存储状态,Slave不接受客户端连接。所有的存储操作都将被复制到连接至Maste的Slaves。
如果Master宕机得到了最新更新的Slave会变成Master。故障节点在恢复后会重新加入到集群中并连接Master进入Slave模式。 所有需要同步的消息操作都将等待存储状态被复制到其他法定节点的操作完成才能完成。
所以,如给你配置了replicas=3,name法定大小是(3/2)+1 = 2。Master将会存储更新然后等待(2-1)=1个Slave存储和更新完成,才汇报success,至于为什么是2-1,阳哥的zookeeper讲解过自行复习。
有一个ode要作为观察者存在。当一个新的Master被选中,你需要至少保障一个法定mode在线以能够找到拥有最新状态的ode,这个ode才可以成为新的Master。 因此,推荐运行至少3个replica nodes以防止一个node失败后服务中断。
|
4. 部署规划和步骤
1.环境和版本
2.关闭防火墙并保证各个服务器能够ping通
3.要求具备zk集群并可以成功启动
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| **配置集群教程** 1,创建我们自己的文件夹存放Zookeeper mkdir /my_zookeeper 2,下载Zookeeper wget -P /my_zookeeper/ https: 3,解压 tar -zxvf apache-zookeeper-3.5.6-bin.tar.g 4,修改配置文件 文件名必须叫这个zoo.cfg cp zoo_sample.cfg zoo.cfg 设置一下自定义的数据文件夹(注意要手动创建这个目录,后面会用到),,注意最后一定要有/结尾,没有/会报错这是坑 dataDir=/my_zookeeper/apache-zookeeper-3.5.6-bin/data/ 在zoo.cfg件最后面追加集群服务器 server.1=192.168.10.130:2888:3888 server.2=192.168.10.132:2888:3888 server.3=192.168.10.133:2888:3888 server.1=leantaot-zk-01:2888:3888 1是一个数字,标识这个是第几号服务器 leantaot-zk-01是这个服务器的IP地址(或者是与IP地址做了映射的主机名) 2888第一个端口用来集群成员的信息交换,标识这个服务器与集群中的leader服务器交换信息的端口 3888是在leader挂掉时专门用来进行选举leader所用的端口 6.把刚刚配置好的Zookeeper整个文件夹远程推送到其他服务器的/my_zookeeper文件夹内 scp -r /my_zookeeper/ root@192.168.10.132:/ scp -r /my_zookeeper/ root@192.168.10.133:/ 7.创建myid文件, id 与 zoo.cfg 中的序号对应 在192.168.10.130机器上执行 echo 1 > /my_zookeeper/apache-zookeeper-3.5.6-bin/data/zookeeper_server.pid echo 1 > /my_zookeeper/apache-zookeeper-3.5.6-bin/data/myid 在192.168.10.132机器上执行 echo 2 > /my_zookeeper/apache-zookeeper-3.5.6-bin/data/zookeeper_server.pid echo 2 > /my_zookeeper/apache-zookeeper-3.5.6-bin/data/myid 在192.168.10.133机器上执行 echo 3 > /my_zookeeper/apache-zookeeper-3.5.6-bin/data/zookeeper_server.pid echo 3 > /my_zookeeper/apache-zookeeper-3.5.6-bin/data/myid 8.bin目录下启动Zookeeper做测试 ./zkServer.sh start 输出 ZooKeeper JMX enabled by default Using config: /my_zookeeper/apache-zookeeper-3.5.6-bin/bin/../conf/zoo.cfg Starting zookeeper ... already running as process 1.
|
就是一台电脑复制三份ActiveMQ
就是ActiveMQ后台管理页面的访问端口
配置文件里面的BrokerName要全部一致
持久化配置(必须)
1 2 3 4 5 6 7 8 9 10 11
| <persistenceAdapter> <replicatedLevelDB directory="${activemq.data}/leveldb" replicas="3" bind="tcp://0.0.0.0:62621" zkAddress="192.168.10.130:2181,192.168.10.132:2181,192.168.10.133:2181" hostname="192.168.10.130" zkPath="/activemq/leveldb-stores" /> </persistenceAdapter>
|

真实的三台机器不用管
- 10.按顺序启动3个ActiveMQ节点,到这步前提是zk集群已经成功启动运行
先启动Zk 在启动ActiveMQ
3台Zk连接任意一台验证三台ActiveMQ是否注册上了Zookeeper
使用zkCli.sh连接一台Zookeeper
/my_zookeeper/apache-zookeeper-3.5.6-bin/bin/zkCli.sh -server 192.168.10.130:2181
查看Master


5. 集群可用性测试
集群需要使用(failover:(tcp://192.168.10.130:61616,tcp://192.168.10.132:61616,tcp://192.168.10.133:61616))配置多个ActiveMQ
2. 高级特性和大厂常考重点
2.1 引入消息队列之后该如何保证其高可用性
zookeeper+Replicated LevelDB
2.2 异步投递Async Sends
官网:http://activemq.apache.org/async-sends
对于一个Slow Consumer,使用同步发送消息可能出现Producer堵塞的情况,慢消费者适合使用异步发送
是什么
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| ActiveMQ支持同步,异步两种发送的模式将消息发送到broker,模式的选择对发送延时有巨大的影响。producer能达到怎么样的产出率(产出率=发送数据总量/时间)主要受发送延时的影响,使用异步发送可以显著提高发送的性能。
ActiveMQ默认使用异步发送的模式:除非明确指定使用同步发送的方式或者在未使用事务的前提下发送持久化的消息,这两种情况都是同步发送的。
如果你没有使用事务且发送的是持久化的消息,每一次发送都是同步发送的且会阻塞producer知道broker返回一个确认,表示消息已经被安全的持久化到磁盘。确认机制提供了消息安全的保障,但同时会阻塞客户端带来了很大的延时。
很多高性能的应用,允许在失败的情况下有少量的数据丢失。如果你的应用满足这个特点,你可以使用异步发送来提高生产率,即使发送的是持久化的消息。 异步发送 它可以最大化producer端的发送效率。我们通常在发送消息量比较密集的情况下使用异步发送,它可以很大的提升Producer性能;不过这也带来了额外的问题, 就是需要消耗更多的Client端内存同时也会导致broker端性能消耗增加; 此外它不能有效的确保消息的发送成功。在userAsyncSend=true的情况下客户端需要容忍消息丢失的可能。
|
官网配置

异步消息如何确定发送成功?
1 2 3 4 5 6 7 8 9 10
| 异步发送丢失消息的场景是:生产者设置userAsyncSend=true,使用producer.send(msg)持续发送消息。 如果消息不阻塞,生产者会认为所有send的消息均被成功发送至MQ。 如果MQ突然宕机,此时生产者端内存中尚未被发送至MQ的消息都会丢失。 所以,正确的异步发送方法是需要接收回调的。 同步发送和异步发送的区别就在此, 同步大宋等send不阻塞了就表示一定发送成功了, 异步发送需要客户端回执并由客户端再判断一次是否发送成功
|
JmsProduce_AsyncSend
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| package com.demo;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQMessageProducer; import org.apache.activemq.AsyncCallback;
import javax.jms.*; import java.util.UUID;
public class Producer { private static final String ACTIVEMQ_URL = "tcp://192.168.10.130:61616"; private static final String ACTIVEMQ_QUEUE_NAME = "Queue-异步投递回调";
public static void main(String[] args) throws JMSException { ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(); activeMQConnectionFactory.setBrokerURL(ACTIVEMQ_URL); activeMQConnectionFactory.setUseAsyncSend(true); Connection connection = activeMQConnectionFactory.createConnection(); connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME); ActiveMQMessageProducer activeMQMessageProducer = (ActiveMQMessageProducer) session.createProducer(queue);
for (int i = 0; i < 3; i++) { TextMessage textMessage = session.createTextMessage("message-" + i); textMessage.setJMSMessageID(UUID.randomUUID().toString() + "----orderAtguigu"); String textMessageId = textMessage.getJMSMessageID(); activeMQMessageProducer.send(textMessage, new AsyncCallback() { @Override public void onSuccess() { System.out.println(textMessageId + "发送成功"); }
@Override public void onException(JMSException exception) { System.out.println(textMessageId + "发送失败"); } }); } activeMQMessageProducer.close(); session.close(); connection.close(); } }
|
2.3 延迟投递和定时投递
官网:http://activemq.apache.org/delay-and-schedule-message-delivery.html
四大属性

案例演示
要在activemq.xml中配置schedulerSupport属性为true
Java代码里面封装的辅助消息类型:ScheduledMessage
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| package com.demo;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQMessageProducer; import org.apache.activemq.AsyncCallback; import org.apache.activemq.ScheduledMessage; import org.springframework.scheduling.annotation.Scheduled;
import javax.jms.*; import java.util.UUID;
public class Producer_延迟投递 { private static final String ACTIVEMQ_URL = "tcp://192.168.10.130:61616"; private static final String ACTIVEMQ_QUEUE_NAME = "Queue-延迟投递";
public static void main(String[] args) throws JMSException { ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(); activeMQConnectionFactory.setBrokerURL(ACTIVEMQ_URL); Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME); MessageProducer messageProducer = session.createProducer(queue); long delay = 3 * 1000; long period = 4 * 1000; int repeat = 5;
for (int i = 0; i < 3; i++) { TextMessage textMessage = session.createTextMessage("message-延时投递" + i); textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay); textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period); textMessage.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat); messageProducer.send(textMessage); } messageProducer.close(); session.close(); connection.close(); } }
|
2.4 分发策略
2.5 ActiveMQ消息重试机制
面试题
1 2 3 4 5 6 7 8 9 10 11 12
| 具体哪些情况会引发消息重发 1:Client用了transactions且再session中调用了rollback 2:Client用了transactions且再调用commit之前关闭或者没有commit 3:Client再CLIENT_ACKNOWLEDGE的传递模式下,session中调用了recover 请说说消息重发时间间隔和重发次数 间隔:1 次数:6 每秒发6次 有毒消息Poison ACK 一个消息被redelivedred超过默认的最大重发次数(默认6次)时,消费的回个MQ发一个“poison ack”表示这个消息有毒,告诉broker不要再发了。这个时候broker会把这个消息放到DLQ(私信队列)。
|
官网http://activemq.apache.org/redelivery-policy
官网有举例代码我就不写了,很简单,创建类就好了
重发整合Spring

属性说明

2.6 死信队列
官网:http://activemq.apache.org/message-redelivery-and-dlq-handling.html
是什么
ActiveMQ中引入了“死信队列”(Dead Letter Queue)的概念。即一条消息再被重发了多次后(默认为重发6次 redeliveyCounter==6),讲会被ActiveMQ移入“死信队列”。开发人员可以在这个Queue中查看处理出错的消息,进行人工干预。

死信队列的使用:处理失败的消息

1 2 3 4 5 6 7 8 9
| 一般生产环境中在使用MQ的时候设计两个队列:一个是核心业务队列,一个是死信队列。 核心业务队列,就是如上图专用用来订单系统发送订单消息的,然后另外一个死信队列就是用来处理异常的。 假如第三方物流系统故障了此时无法请求,那么仓储系统每次消费到一条订单消息,尝试通知发货和配送都会遇到对方的接口报错。此时仓储系统 就可以把这个消息拒绝访问或者标志位处理失败,一旦标志这条消息处理失败了之后,MQ就会把这条消息转入提前设置好的一个死信队列中。 然后你会看到的就是,在第三方物流系统故障期间,所有订单消息全部处理失败,全部会转入死信队列。然后你的仓储系统得专门有一个后台线程, 重新执行发货和配送得通知逻辑
|
activemq死信队列的配置介绍
SharedDeadLetterStrategy
1 2 3 4 5
| 将所有的DeadLetter保存在一个共享的队列中,这是ActiveMQ broker端默认的策略。 共享队列默认为“ActiveMQ.DLQ”,可以通过“deadLetterQueue”属性来设定 <deadLetterStrategy> <shareDeadLetterStrategy deadLetterQueue="DLQ_QUEUE"/> </deadLetterStrategy>
|
IndividualDeadLetterStrategy

2.7 如果保证消息不被重复消费呢?幂等性问题
1 2 3 4 5 6
| 网络延迟传输中,会造成进行MQ重试中,在重试过程中,可能造成重复消费。 如果消息是做数据库的插入操作,给这个消息做一个唯一主键,那么就会出现重复消费的情况,就会出现主键冲突,避免数据库出现脏数据。 如果上面两种情况还不行,准备一个第三服务方来来做消息记录。以Redis为例,给消息分配一个全局id,只要消费过该消息,将<id,message>以K-V形式写入Redis,那么消费者开始消费前,先去redis种查询有没有消费记录即可
|