一、什么是RocketMQ
RocketMQ 是一个分布式消息中间件,其具有低延迟、高性能和可靠性、万亿级容量、灵活的可扩展性特性。它主要有四部分组成,分别为 name servers,brokers,producers and consumers。
每部分都可以进行水平扩展,而不会出现单点问题。
NameServer Cluster:名称服务集群,提供轻量级的服务发现与路由服务,每个名称服务器记录了全部的 broker 的路由信息,并且提供相应的读写服务,支持快速存储扩展。
Broker Cluster:broker 集群,broker 通过提供轻量级的主题和队列机制来维护消息存储。它支持推和拉两种模型,包含容错机制(2 个副本或 3 个副本),并提供了强大的平滑峰值,提供积累数以亿计的消息并保证其在原始时间顺序的被消费能力。
此外,broker 也提供灾难恢复、丰富的度量统计和警报机制,所有这些能力在传统的消息传递系统里面都是没有的。
Producer Cluster:生产者集群,提供分布式部署,分布式的生产者发送消息到 broker 集群,具体选择哪一个 broker 机器是通过一定的负载均衡策略来决定的,发送消息中支持故障快速恢复,并且具有较低的延迟时间
Consumer Cluster:消费者集群,消费者在推和拉模型中支持分布式部署。它还支持集群消费和消息广播。它提供了实时消息订阅机制,可以满足大多数消费者的需求。
broker 在启动时候会去链接 NameServer,然后注册 topic 信息到 NameServer,NameServer 维护了所有 topic 的信息和对应的 broker 路由信息,broker 与 NameServer 之间是有心跳检查的,NameServer 发现 broker 挂了后,会从注册信息里面删除该 broker,这类似 zookeeper 实现的服务注册;producer 则需要配置 nameserver 的地址,然后定时从 NameServer 获取对应 topic 的路由信息(这个 topic 的消息应该路由到那个 broker)。
同时 producer 与 NameServer,proudcer 与 broker 有心跳检查;同理 Consumer 需要配置 NameServer 的地址,然后定时从 NameServer 获取对应 topic 的路由信息(应该从那个 broker 的消息队列获取消息),同时 Consumer 与 NameServer,Consumer 与 broker 有心跳检查。
二、RocketMQ概念
1. MQ
Message Queue消息队列,既然是队列,就要实现数据结构中队列的基本特征,比如先进先出,入队、出队操作等。
RocketMQ就是把内存中使用的那个队列,变成一个独立的、大家都可以用的队列系统。
2. Topic
一个业务事件,是整个MQ领域最核心的概念,无论是生产还是消费都是针对Topic进行操作。
如果MQ是个大的队列,只有一个队列可以用太浪费了吧,来分一分,分解成很多个小的独立的队列。RocketMQ变成一个管理队列的系统,而分解下来的若干个小的队列通过什么来区分呢?
就是通过topic。
比如我的业务定义topic:tp_im_event。你的业务定义topic:tp_cargo_event,那就是两个小队列了,我的业务用我的队列,你的项目用你的队列。Topic就是队列的名字。
3. Queue
既然Topic是队列的名字,那么queue就表示真实操作的队列了。一开始的时候一个Topic就对应一个queue,多好,一个是名字、一个是现实。可是用着用着就悲催了,为啥?消息操作太多了,全都放在一个小队列上。为了提高效率,咋整??RocketMQ是这样做的,一个Topic绑定的是一组queue,这样每个queue分摊部分压力,性能就上去了。
读队列个数:可以用来读取数据的队列个数
写队列个数:可以用来写入数据的队列个数
queue:真实存储数据用的队列。
4. Message
队列存储的是消息!Message!尽量小,别发个文件啊什么的大东西,后面真心扛不住(超过特定大小还会报错)。
5. Tag
一个queue里都是消息,如何对这些消息进行归类呢?为了进一步细化消息,有了Tag的概念。可以通过Tag对相同消息进行归类,这样用户就可以只订阅一部分的消息了(只订阅部分Tag)
比如:有一个Topic叫做‘发货’,下游消费者希望可以根据货源进行不同的处理,可以通过‘tag=北京’以及‘tag=上海’来区分不同的发货源。下游消费者,可以单独订阅‘上海’的货物,或者‘tag=上海|江苏|浙江’来订阅这三个地区的货物,还可以‘tag=*’来订阅全国的货物。
6. Key
发送了某个消息,但是希望在后台很方便的搜索到,就要通过key了。可以根据key搜索到所有相关的Message。可以认为RocketMQ内部维护了一个非常大的HashMap,key就是这个key,value就是Message,如果出现Hash冲突就用链表来报错对应关系。
7. Producer
生产者:针对某一个Topic制造数据,把数据塞到queue里。(发消息的)
8. Producer Group
管理消息的时候,我们肯定会遇见这个问题,某个消息谁发的?RocketMQ把发送者的身份抽象成了Producer Group,就是[发送组]。
简单点:这个东西命名成项目名就行,相同Producer Group保持相同业务行为。
9. Consumer
消费者:把queue里面的消息拿出来用
消费行为:如何处理通过Topic+Tag定位的消息。
10. Consumer Group
一个RocketMQ集群是如何区分消费者是谁的呢?就是通过消费组,相同消费组的机器,MQ认为消费行为是一致的。业务上一定要保证相同消费组有相同的消费行为。对于不同的消费组名字,RocketMQ就认为是个不同消费者了。如果修改了消费组的名字,那就是新的消费者,就会按照新的消费组的消费进度处理消费。
消息那么多,项目都重启无数次了,RocketMQ是如何记录消息消费到什么地方了呢?
也是通过消费组,RocketMQ内部会维护一个关系,记录Consumer Group和消费进度之间的联系。所以,如果把Consumer Group的名字改掉是可能重新消费之前的所有数据的(视初始消费位置而定)
11. 消息延迟/积压
消息队列主要的功能是模块结偶,同步转异步和削峰,必然会出现生产非常快但是消费慢这种事情,比如生产的速度是100000/s但是消费速度是1/s,这个时候就叫做消息积压或者消费延迟(Delay)。理论上RockeMQ对于这种场景有比较好的适应能力,原理大致这样:正常的生产消费都是操作内存数据,所以比较快。但是如果积压非常多,内存明显扛不住了,则降级为生产消费的是磁盘数据,直接操作磁盘。磁盘肯定比内存的速度慢很多啦。
这个时候整个集群的处理能力就拉低了。所以最好生产和消费能力不要相差太多,即便相差很多,积压也应该在有限的时间内处理完毕。
目前比较容易出现消息积压的情况有:
新消费组上线(消费历史消息)
消费能力弱
生产洪峰(比如for循环发消息,job发消息)
由于RocketMQ开源版本没有多租户隔离,所以公共集群使用的过程中会有相互影响发生,鉴于此大家在上线前还是要合理评估自己的系统能力。
12. InstanceName
上面说的Producer Group和Consumer Group都是逻辑概念。如果需要连接多集群,就需要物理上进行区分(Instance Name)。
一个Instance Name对应一个连接,默认的值是本机ip@进程号。连接多集群的时候务必修改这个值。
三、MQ的使用场景
消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。
1. 异步处理
场景说明:用户注册后,需要发注册邮件和注册短信。传统的做法有两种 (1) 串行的方式;(2) 并行方式
注册邮件,发送短信写入消息队列后,直接返回,缩短了响应时间,提高了吞吐量。
2. 应用解耦
场景说明:用户下单后,订单系统需要通知库存系统。传统的做法是,订单系统调用库存系统的接口。
传统模式的缺点:假如库存系统无法访问,则订单减库存将失败,从而导致订单失败,订单系统与库存系统耦合。
引入消息队列后,
订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功。
库存系统:订阅下单的消息,采用拉/推的方式,获取下单信息,库存系统根据下单信息,进行库存操作。
假如:在下单时库存系统不能正常使用。也不影响正常下单,因为下单后,订单系统写入消息队列就不再关心其他的后续操作了。实现订单系统与库存系统的应用解耦。
3. 流量削峰
流量削锋也是消息队列中的常用场景,一般在秒杀或团抢活动中使用广泛。
应用场景:秒杀活动,一般会因为流量过大,导致流量暴增,应用挂掉。为解决这个问题,一般需要在应用前端加入消息队列。
a、可以控制活动的人数。
b、可以缓解短时间内高流量压垮应用。
用户的请求,服务器接收后,首先写入消息队列。假如消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面。
秒杀业务根据消息队列中的请求信息,再做后续处理
4. 日志处理
日志处理是指将消息队列用在日志处理中,比如Kafka的应用,解决大量日志传输的问题。
日志采集客户端,负责日志数据采集,定时写受写入Kafka队列。
Kafka消息队列,负责日志数据的接收,存储和转发。
日志处理应用:订阅并消费kafka队列中的日志数据 。
5. 消息通讯
消息通讯是指,消息队列一般都内置了高效的通信机制,因此也可以用在纯的消息通讯。比如实现点对点消息队列,或者聊天室等。