RabbitMQ

Jingxc大约 11 分钟java后端java后端RabbitMQ

RabbitMQ

为什么使用MQ

流量削峰:解决高并发问题


例如秒杀活动,可能会在短时间内产生大量请求同时打到服务端,如果后端对每个请求都进行数据库读写操作,定会造成服务器压力过大,产生服务异常甚至不可用。我们可以通过使用MQ实现流量缓冲,将所有请求先放入消息队列中,服务端每次处理业务先从消息队列获取,从而实现流量削峰,解决高并发问题。

应用解耦:提升系统可用性


例如电商应用中有订单系统、库存系统、物流系统、支付系统,当用户创建订单后,先后调用库存系统、物流系统、支付系统,任何一个子系统出了故障,都会造成下单失败。引入消息队列后,系统间耦合调用的问题会减少,任何一个子系统出现故障都不会影响用户下单,子系统故障恢复后,会继续处理消息,提升系统可用性。

异步处理:提升响应速度


当用户在客户端提交了一个同步请求,后端处理需要耗时很久才能响应,这对用户体验来说无疑是致命的。如果说用户并不关心请求是否处理完成,对于一些耗时的非事务性的业务处理,可以使用消息队列异步请求的方式,将请求信息放入消息队列,直接返回客户端响应,后端监听队列自行处理,提升响应速度。

图示
图示

工作原理

工作原理
工作原理
  • 生产者(Producer)通过信道(Channel)把消息发送给交换机(Exchange),当创建交换机时需要指定类型(四种类型:直接Direct,扇出Fanout ,主题Topic ,消息头Headers );
  • 交换机(Exchange)接收消息并且负责对消息进行路由,交换机根据消息的属性来把消息分发到不同的队列(Queue)上;
  • 消息(Message)会一直留在队列里直到被消费者(Consumer)消费。

核心概念

  • 生产者(Producer):发送消息的应用。
  • 消费者(Consumer):接收消息的应用。
  • 队列(Queue):存储消息的缓存。
  • 消息(Message):由生产者通过RabbitMQ发送给消费者的信息。
  • 连接(Connection):连接RabbitMQ和应用服务器的TCP连接。
  • 信道(Channel):连接里的一个虚拟通道,通过消息队列发送或者接收消息时,都是通过信道进行的。如果每一次访问RabbitMQ都建立一个Connection,在消息量很大时,建立TCP Connection将消耗很大,效率也较低,Channel是Connection内部建的逻辑链接,每个channel之间是完全隔离的,极大的减少了创建TCP Connection的消耗
  • 交换机(Exchange):交换机负责从生产者那里接收消息,并根据交换类型分发到对应的消息队列里。根据分发规则,匹配查询表中的routing key,分发消息到Queue中去,常用的类型有: direct (point-to-point), topic(publish-subscribe) and fanout(multicast)
  • 绑定(Binding):绑定是队列和交换机的一个关联连接。
  • 路由键(Routing Key):路由键是供交换机查看并根据键来决定如何分发消息到队列的一个键,路由键可以说是消息的目的地址。
  • 代理(Broker):接收和分发消息的应用,RabbitMQ Server就是Message Broker。
  • 虚拟主机(Virtual host):出于多租户和安全因素设计的,把AMQP的基本组件划分到一个虚拟的分组中,类似于网络中的namespace概念。当多个不同的用户使用同一个RabbitMQ server提供的服务时,可以划分出多个vhost,每个用户在自己的vhost创建exchange/queue 等。

高级应用

死信队列


死信队列(DLX,Dead-Letter-Exchange),利用DLX,当消息在一个队列中变成无法被消费的消息(dead message)之后,它能被重新publish到另一个Exchange,这个Exchange就是DLX。

消息变成死信的几种情况:

  1. 消息被拒绝(channel.basicReject/channel.basicNack)并且request=false;
  2. 消息在队列的存活时间超过设置的生存时间(TTL)时间;
  3. 队列达到最大长度(队列满了,无法再添加数据到队列中)。

DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。

死信队列的设置:

  1. 首先,需要设置死信队列的Exchange和queue,然后进行绑定;
  2. 然后,我们进行正常声明交换机、队列、绑定,只不过我们需要在队列机上一个参数即可:arguments.put(“x-dead-letter-exchange”,”dlx.exchange”);这样消息在过期、被拒绝、队列在达到最大长度时,消息就可以直接路由到死信队列。
死信队列
死信队列

延迟队列


  1. 基于rabbitmq_delayed_message_exchange插件,实现延迟队列效果。它是一种新的交换类型,该类型消息支持延迟投递机制消息传递后并不会立即投递到目标队列中,而是存储在mnesia(一个分布式数据系统)表中,当达到投递时间时,才投递到目标队列中。使用延迟队列,可以有效解决定时任务带来的系统压力以及业务处理时效性等问题。

  2. RabbitMQ本身没有延迟队列,需要靠TTL和DLX模拟出延迟的效果

TTL(Time To Live)

RabbitMQ可以针对Queue和Message设置 x-message-tt,来控制消息的生存时间,如果超时,则消息变为dead letter

RabbitMQ针对队列中的消息过期时间有两种方法可以设置。

A: 通过队列属性设置,队列中所有消息都有相同的过期时间。

B: 对消息进行单独设置,每条消息TTL可以不同。

如果同时使用,则消息的过期时间以两者之间TTL较小的那个数值为准。消息在队列的生存时间一旦超过设置的TTL值,就成为dead letter

DLX (Dead-Letter-Exchange)

RabbitMQ的Queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可选)两个参数,如果队列内出现了dead letter,则按照这两个参数重新路由。

x-dead-letter-exchange:出现dead letter之后将dead letter重新发送到指定exchange

x-dead-letter-routing-key:指定routing-key发送

利用DLX,当消息在一个队列中变成死信后,它能被重新publish到另一个Exchange。这时候消息就可以重新被消费。

队列幂等性


对于一个资源,不管你请求一次还是请求多次,对该资源本身造成的影响应该是相同的,不能因为重复相同的请求而对该资源重复造成影响。注意关注的是请求操作对资源本身造成的影响,而不是请求资源返回的结果。 MQ消费者的幂等性一般使用全局ID或者写个唯一标识(比如流水号/时间戳/UUID/订单号)来判断该消息是否已消费过,也可以利用redis执行setnx命令,天然具有幂等性,从而实现不重复消费(推荐使用redis)。

优先级队列


优先级队列,也就是具有高优先级的队列,优先级高的消息具备优先被消费的特权。通过队列的 x-max-priority 参数设置队列的最大优先级,之后在发送消息时通过 priority 属性再设置当前消息的优先级。优先级应在 0 和 255 之间,推荐1 ~ 10。

  • 优先级默认最低为0,最高为队列设置的最大优先级;
  • 对于单条消息来谈优先级是没有什么意义的。假如消费者的消费速度大于生产者的速度且Broker中没有消息堆积的情况下,对发送的消息设置优先级就没有什么意义,因为生产者刚发完一个消息就被消费者消费了,相当于Broker中至多只有一条消息。

惰性队列


惰性队列会尽可能地将消息存入磁盘中,而在消费者消费消息时才会被加载到内存中,它支持更多的消息存储。 队列具备两种模式:default 和 lazy。默认的为 default 模式,在队列声明的时候可以通过“x-queue-mode”参数来设置队列的模式,取值为“default”和“lazy”。

工作模式

Simple(简单模式)

一个消费者消费一个生产者生产的信息

Work queues(工作模式)

一个生产者生产信息,多个消费者进行消费,但是一条消息只能消费一次

一对多模式,一个生产者,多个消费者,一个队列,每个消费者从队列中获取唯一的消息。与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。

工作模式
工作模式
  • 在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系
  • 工作模式对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。例如:短信服务部署多个,只需要有一个节点成功发送即可。

Publish/Subscribe(发布订阅模式)

生产者首先投递消息到交换机,订阅了这个交换机的队列就会收到生产者投递的消息

在订阅模型中,多了一个 Exchange 角色,而且过程略有变化:

生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)

消费者,消息的接收者,会一直等待消息到来

消息队列,接收消息、缓存消息

交换机一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:

  • Fanout:广播,将消息交给所有绑定到交换机的队列
  • Direct:定向,把消息交给符合指定routing key 的队列
  • Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
  • 交换机:只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合路由规则的队列,那么消息会丢失!
发布订阅模式
发布订阅模式

a、交换机需要与队列进行绑定,绑定之后;一个消息可以被多个消费者都收到。

b、发布订阅模式与工作队列模式的区别:

工作队列模式不用定义交换机,而发布/订阅模式需要定义交换机

发布/订阅模式的生产方是面向交换机发送消息,工作队列模式的生产方是面向队列发送消息(底层使用默认交换机)

发布/订阅模式需要设置队列和交换机的绑定,工作队列模式不需要设置,实际上工作队列模式会将队列绑 定到默认的交换机

Routing(路由模式)

生产者生产消息投递到direct交换机中,扇出交换机会根据消息携带的routing Key匹配相应的队列

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个 RoutingKey(路由key)
  • 消息的发送方在向 Exchange 发送消息时,也必须指定消息的 RoutingKey
  • Exchange 不再把消息交给每一个绑定的队列,而是根据消息的 Routing Key 进行判断,只有队列的Routingkey 与消息的 Routing key 完全一致,才会接收到消息

Topics(主题模式)

生产者生产消息投递到topic交换机中,上面是完全匹配路由键,而主题模式是模糊匹配,只要有合适规则的路由就会投递给消费者

  • Topic 类型与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型Exchange 可以让队列在绑定 Routing key 的时候使用通配符!
  • Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
  • 通配符规则:# 匹配一个或多个词,* 匹配不多不少恰好1个词,例如:item.# 能够匹配 item.insert.abc 或者 item.insert,item.* 只能匹配 item.insert
路由/主题
路由/主题

交换机类型

  1. direct Exchange(直接交换机)

匹配路由键,只有完全匹配消息才会被转发

  1. Fanout Excange(扇出交换机)

将消息发送至所有的队列

  1. Topic Exchange(主题交换机)

将路由按模式匹配,此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词,符号“匹配不多不少一个词。因此“abc.#”能够匹配到“abc.def.ghi”,但是“abc.” 只会匹配到“abc.def”。

  1. Header Exchange

在绑定Exchange和Queue的时候指定一组键值对,header为键,根据请求消息中携带的header进行路由

上次编辑于:
贡献者: Jingxc