1.什么是RabbitMQ?
RabbitMQ是一款开源的,Erlang编写的,基于AMQP协议的消息中间件
2.Rabbitmq 的使用场景
3.RabbitMQ基本概念
由Exchange、Queue、RoutingKey三个才能决定一个从Exchange到Queue的 唯一的线路。
4. MQ优缺点
优点:
应用解耦
生产者(客户端)发送消息到消息队列中去,接受者(服务端)处理消息,需要消费的系统直接去消息队列取消息进行消费即可,而不需要和其他系统有耦合,这显然也提高了系统的扩展性。
异步提速
将用户的请求数据存储到消息队列之后就立即返回结果。随后,系统再对消息进行消费。
削峰限流
先将短时间内高并发产生的事务消息存储在消息队列中,然后后端服务再慢慢根据自己的能力去消费这些消息,这样就避免直接把后端服务打垮掉。
缺点
系统可用性降低
系统引入的外部依赖越多,系统稳定性越差。一旦 MQ 宕机,就会对业务造成影响。
系统复杂度提高
MQ 的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过 MQ 进行异步调用。
一致性问题
A 系统处理完业务,通过 MQ 给 B、C、D 三个系统发消息数据,如果 B 系统、C 系统处理成功,D 系统处理失败,则会造成数据处理的不一致。
5.Kafka、ActiveMQ、RabbitMQ、RocketMQ 有 什么优缺点?
综上,各种对比之后,有如下建议:
一般的业务系统要引入 MQ,最早大家都用 ActiveMQ,但是现在确实大家用 的不多了,没经过大规模吞吐量场景的验证,社区也不是很活跃,所以大家还是 算了吧,我个人不推荐用这个了;
后来大家开始用 RabbitMQ,但是确实 erlang 语言阻止了大量的 Java 工程师 去深入研究和掌控它,对公司而言,几乎处于不可控的状态,但是确实人家是开 源的,比较稳定的支持,活跃度也高;
不过现在确实越来越多的公司会去用 RocketMQ,确实很不错,毕竟是阿里出 品,但社区可能有突然黄掉的风险(目前 RocketMQ 已捐给 Apache,但 GitHub 上的活跃度其实不算高)对自己公司技术实力有绝对自信的,推荐用 RocketMQ,否则回去老老实实用 RabbitMQ 吧,人家有活跃的开源社区,绝 对不会黄。
所以中小型公司,技术实力较为一般,技术挑战不是特别高,用 RabbitMQ 是 不错的选择;大型公司,基础架构研发实力较强,用 RocketMQ 是很好的选 择。
如果是大数据领域的实时计算、日志采集等场景,用 Kafka 是业内标准的,绝 对没问题,社区活跃度很高,绝对不会黄,何况几乎是全世界这个领域的事实性 规范。
6.RabbitMQ的工作模式
1.simple模式(即最简单的收发模式)
特点:
一个生产者对应一个消费者,通过队列进行消息传递。
该模式使用 direct 交换机,direct 交换机是 RabbitMQ 默认交换机。
2. 工作队列模式(Work Queue)
与简单模式相比,工作队列模式(Work Queue)多了一些消费者,该模式也使用 direct 交换机,应用于处理消息较多的情况。特点如下:
一个队列对应多个消费者。
一条消息只会被一个消费者消费。
消息队列默认采用 轮询 的方式将消息平均发送给消费者。
3. 发布订阅模式(Publish/Subscribe)
在开发过程中,有一些消息需要不同消费者进行不同的处理,如电商网站的同一条促销信息需要短信发送、邮件发送、站内信发送等。此时可以使用发布订阅模式(Publish/Subscribe),特点如下:
4. 路由模式(Routing)
使用发布订阅模式时,所有消息都会发送到绑定的队列中,但很多时候,不是所有消息都无差别的发布到所有队列中。比如电商网站的促销活动,618大促可能会发布到所有队列;而一些小的促销活动为了节约成本,只发布到站内信队列。此时需要使用 路由模式 (Routing)完成这一需求。特点如下:
5. 通配符模式(Topics)
通配符模式(Topics)是在路由模式的基础上,给队列绑定带通配符的路由关键字,只要消息的 RoutingKey 能实现通配符匹配,就会将消息转发到该队列。通配符模式比路由模式更灵活,通配符模式使用 topic 交换机。 能按照通配符规则将消息发送给指定队列
通配符规则如下:
7.如何保证RabbitMQ消息的顺序性?
拆分多个 queue,每个 queue 一个 consumer,就是多一些 queue 而已,确 实是麻烦点;或者就一个 queue 但是对应一个 consumer,然后这个 consumer 内部用内存队列做排队,然后分发给底层不同的 worker 来处理。
8.如何保证消息不被重复消费?或者说,如何保证消息消费时的幂等性?
1.什么情况会导致消息被重复消费呢
生产者:生产者可能会重复推送一条数据到 MQ 中,比如 Controller 接口被重复调用了 2 次,没有做接口幂等性导致的;
MQ:在消费者消费完准备响应 ack 消息消费成功时,MQ 突然挂了,导致 MQ 以为消费者还未消费该条数据,MQ 恢复后再次推送了该条消息,导致了重复消费。
消费者:消费者已经消费完消息,正准备但是还未响应给ack消息到时,此时消费者挂了,服务重启后 MQ 以为消费者还没有消费该消息,再次推送了该条消息。
2.解决方案
2.1使用数据库唯一键约束
缺点:局限性很大,仅仅只能用在我们数据新增场景,并且性能也比较低
2.2使用乐观锁
假设是更新订单状态,在发送的消息的时候带上修改字段的版本号
缺点:如果说更新字段比较多,并且更新场景比较多,可能会导致数据库字段增加并且还有可能出现多条消息同时在队列中此时他们修改字段版本号一致,排在后续的消息无法被消费
2.3 简单的消息去重,插入消费记录,增加数据库判断
优点:很多场景下的确能起到不错的效果
缺点:
这个消费者的代码执行需要1秒,重复消息在执行期间(假设100毫秒)内到达(例如生产者快速重发,Broker重启等),增加校验的地方是不是还是没数据(因为上一条消息还没消费完,没有记录)
那么就会穿透掉检查的挡板,最后导致重复的消息消费逻辑进入到非幂等安全的业务代码中,从而引发重复消费的问题
2.4并发消息去重基于消息幂等表
缺点:如果说第一次消息投递异常没有消费成功,并且没有将消息状态给置为成功或者没有删除消息表记录,此时延时消费每次执行下列都是一直处于消费中,最后消费就会被视为消费失败而被投递到死信Topic中
方案:插入的消息表必须要带一个最长消费过期时间,例如10分钟.
上述方案只需要一个存储的中心媒介,那我们可以选择更灵活的存储中心媒介,比如Redis。使用Redis有两个好处:
3.总结
9.如何保证RabbitMQ消息的可靠传输?
一、可靠传输
本篇文章主要讲 RabbitMQ 如何保证消息的可靠传输,所以在讲RabbitMQ的实现之前,我们需要先来搞懂一个问题,就是什么是消息的可靠传输。
在 RabbitMQ 中,一个消息从产生到被消费大致需要经过三个步骤,即生产者生产消息,消息投递到 RabbitMQ,RabbitMQ 再将消息推送消费者(或者是消费者拉取),最终消费者将这条消息成功消费。
所以消息丢失也可以划分为三种情况——生产者、消息队列、消费者,如下图所示:
所以消息的可靠传输,就是确保消息能够百分百从生产者发送到服务器,再从服务器发送到消费者。接下来我们就针对这三种情况分别讨论解决方案。
二、生产者投递消息失败
1. 事务机制
使用 RabbitMQ 的事务功能,此时可以选择用 RabbitMQ 提供的事务功能,就是生产者发送数据之前开启 RabbitMQ 事务 channel.txSelect,然后发送消息,如果消息没有成功被 RabbitMQ 接收到,那么生产者会收到异常报错,此时就可以回滚事务 channel.txRollback,然后重试发送消息;如果收到了消息,那么可以提交事务 channel.txCommit。
// 开启事务
channel.txSelect();
try {
// 发送消息
} catch(Exception e) {
channel.txRollback();
// 重发消息
}
// 提交事务
channel.txCommit();
事务机制可以基本确保生产者投递消息成功,但是这种方式有比较大的缺点,基本上 RabbitMQ 事务机制(同步)一搞,基本上吞吐量会下来,因为太耗性能了。
针对上述问题,如果还要确保 RabbitMQ 生产者的消息正确投递,可以开启 confirm 模式,在生产者端设置开启 confirm 模式之后,你每次写的消息都会分配一个唯一的 id,然后如果写入了 RabbitMQ 中,RabbitMQ 会给你回传一个 ACK 消息,告诉你说这个消息 ok 了。
如果 RabbitMQ 没能处理这个消息,会回调你一个 nack 接口,告诉你这个消息接收失败,你可以重试。而且你可以结合这个机制自己在内存里维护每个消息 id 的状态,如果超过一定时间还没接收到这个消息的回调,那么你可以重发。
try {
channel.confirmSelect(); //将信道置为 publisher confirm 模式
// 之后正常发送消息
channel.basicPushlish("exchange", "routingKey", null,
"publisher confirm test".getBytes());
if (!channel.waitFormConfirms()) {
System.out.println("Send message failed");
// do something else...
} catch (InterruptedException e) {
e.printStackTrace();
}
}
事务机制和 confirm 机制最大的不同在于,事务机制是同步的,你提交一个事务之后会阻塞在那儿,但是 confirm 机制是异步的,你发送个消息之后就可以发送下一个消息,然后那个消息 RabbitMQ 接收之后会异步回调你一个接口通知你这个消息接收到了。
所以一般生产者到 RabbitMQ 这块避免数据丢失,会采用 confirm 机制更多些。
三、消息队列自身丢失
就是 RabbitMQ 自己弄丢了数据,这个你必须开启 RabbitMQ 的持久化,就是消息写入之后会持久化到磁盘,哪怕是 RabbitMQ 自己挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢。除非极其罕见的是,RabbitMQ 还没持久化,自己就挂了,可能导致少量数据丢失,但是这个概率较小。
设置持久化有两个步骤:
这样就可以保证 RabbitMQ 持久化 queue 的元数据,但是不会持久化 queue 里的数据。
就是将消息设置为持久化,此时 RabbitMQ 就会将消息持久化到磁盘上去。
必须要同时设置这两个持久化才行,RabbitMQ 哪怕是挂了,再次重启,也会从磁盘上重启恢复 queue,恢复这个 queue 里的数据。
同时持久化也可以跟生产者那边的 confirm 机制配合起来,只有消息被持久化到磁盘后,才会通知生产者 ack 了,所以哪怕是在持久化到磁盘之前,RabbitMQ 挂了,数据丢了,生产者收不到 ACK,你也是可以自己重发的。
注意,哪怕是你给 RabbitMQ 开启了持久化机制,也有一种可能,就是这个消息写到了 RabbitMQ 中,但是还没来得及持久化到磁盘上,结果不巧,此时 RabbitMQ 挂了,就会导致内存中的数据丢失。
四、消费者宕机
对于消费者端所产生的情况就是:消费者成功接收到消息,但是还未将消息处理完毕就宕机了。针对这种情况,可以利用 RabbitMQ 提供的 消息确认机制。
为了保证消息从队列可靠地到达消费者,RabbitMQ 提供了消息确认机制(message acknowledgement)。消费者在订阅队列时,可以指定 autoAck 参数:
所以对于消费者可能发生宕机地情况,我们可以将 autoAck 参数置为 false,消费者就有足够的时间处理这条消息,不用担心处理消息过程中消费者进程挂掉后消息丢失的问题,因为 RabbitMQ 会一直等待并且持有这条消息,直到消费者显示调用 Basic.Ack 命令为止。
当 autoAck 参数设置为 false,对于 RabbitMQ 服务端而言,队列中的消息分成了两个部分:一部分是等待投递给消费者的消息,另一部分是已经投递给消费者,但是还没有收到消费者确认信号的消息。如果 RabbitMQ 一直没有收到消费者的确认信号,并且消费此消息的消费者已经断开连接,则 RabbitMQ 会安排该消息重新进入队列,等待投递给下一个消费者,当然也可能还是原来的那个消费者。