消息中间件
消息中间件
还是本分区惯例,旨在快速集成,如需了解概念性知识点,请查阅分区知识点-RabbitMq
docker安装RabbitMQ
用于demo测试,在本机安装RabbitMQ,这里使用docker 进行安装
#查看版本
docker search rabbitmq
jingxc@JingxcdeMacBook-Pro ~ % docker search rabbitmq
NAME DESCRIPTION STARS OFFICIAL AUTOMATED
rabbitmq RabbitMQ is an open source multi-protocol me… 4763 [OK]
bitnami/rabbitmq Bitnami Docker Image for RabbitMQ 97 [OK]
masstransit/rabbitmq 11
bitnami/rabbitmq-exporter 2
exozet/rabbitmq-delay-management deprecated 1
circleci/rabbitmq-delayed https://github.com/circleci/rabbitmq-delayed… 1
nasqueron/rabbitmqadmin RabbitMQ management plugin CLI tool Lightwei… 1 [OK]
# 下载合适的版本,这里下载了最新的版本,下载其他的版本可用docker pull rabbitmq:v
docker pull rabbitmq
# 启用
docker run -di --name rabbit-mq -p 5672:5672 -p 15672:15672 rabbitmq
## 由于调试用启用这两个接口就可以了
rabbitmq各端口作用
- client端通连接端口:5672
- 页面管理访问端口: 15672
- 服务间通信端口:25672
启动后访问http://localhost:15672就可以看到管理页面了,
注意
rabbitmq:3.8版本开始,管理插件包含在RabbitMQ发行版中。与其他任何插件一样,必须先启用它,然后才能使用它。这是使用rabbitmq-plugins完成的:
执行此命令即可:rabbitmq-plugins enable rabbitmq_management
插件激活后,无需重新启动节点。
3.问题解决 进入rabbitmq容器
docker exec -it rabbitmq /bin/bash
执行rabbitmq-plugins enable rabbitmq_management
输入用户名密码,默认用户名guest/guest;登录管理界面。
警告
如果在queue页面看不到Message的具体信息
需要做如下配置:
#1. 进入容器内部
jingxc@JingxcdeMacBook-Pro ~ % docker exec -it rabbit-mq /bin/bash
#2.进如容器后进到该目录下
root@b4078072717a:/# cd /etc/rabbitmq/conf.d/
root@b4078072717a:/etc/rabbitmq/conf.d# ls
10-defaults.conf 20-management_agent.disable_metrics_collector.conf
#3. 执行命令
root@b4078072717a:/etc/rabbitmq/conf.d# echo management_agent.disable_metrics_collector = false > management_agent.disable_metrics_collector.conf
root@b4078072717a:/etc/rabbitmq/conf.d# echo management.disable_stats = false > management_agent.disable_metrics_collector.conf
root@b4078072717a:/etc/rabbitmq/conf.d# echo management.enable_queue_totals = true > management_agent.disable_metrics_collector.conf
root@b4078072717a:/etc/rabbitmq/conf.d# exit
exit
#4.重启
jingxc@JingxcdeMacBook-Pro ~ % docker restart rabbit-mq
rabbit-mq
jingxc@JingxcdeMacBook-Pro ~ %
maven依赖
选取版本,可访问https://mvnrepository.com/
<!--我这里选用了一个用的人数最多的版本-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.7.3</version>
</dependency>
配置文件
MQ创建用户
#进入容器
jingxc@JingxcdeMacBook-Pro ~ % docker exec -it rabbit-mq /bin/bash
root@b4078072717a:/#
## 创建用户
root@b4078072717a:/# rabbitmqctl add_user root 100uu100UU
Adding user "root" ...
Done. Don't forget to grant the user permissions to some virtual hosts! See 'rabbitmqctl help set_permissions' to learn more.
## 授予虚拟机权限
root@b4078072717a:/# rabbitmqctl set_permissions -p / root ".*" '.*' '.*'
Setting permissions for user "root" in vhost "/" ...
root@b4078072717a:/# exit
消息发送者
spring:
# rabbit-mq配置
rabbitmq:
host: 127.0.0.1
port: 5672
username: root
password: 100uu100UU
virtual-host: /
# 开启回退模式 消息从 exchange 到 queue 投递失败有一个 returnCallback 退回模式。
publisher-returns: true
# 开启发布确认 消息从 producer 到 rabbitmq broker有一个 confirmCallback 确认模式。
publisher-confirm-type: correlated
listener:
simple:
# auto:⾃动模式 (根据侦听器检测是正常返回、还是抛出异常来发出 ack/nack),默认,manual:手动模式,none:自动模式(默认开启)
acknowledge-mode: auto
retry:
#开启重试
enabled: true
#最大重试次数
max-attempts: 3
#重试间隔
max-interval: 1000ms
消息接收者
spring:
# rabbit-mq配置
rabbitmq:
host: 127.0.0.1
port: 5672
username: root
password: 100uu100UU
listener:
simple:
#手动确认消息 ack
acknowledge-mode: manual
retry:
enabled: true
max-attempts: 3
max-interval: 1000ms
消息确认
发送消息确认:用来确认生产者 producer 将消息发送到 broker ,broker 上的交换机 exchange 再投递给队列 queue的过程中,消息是否成功投递。
消息从 producer 到 rabbitmq broker有一个 confirmCallback 确认模式。
消息从 exchange 到 queue 投递失败有一个 returnCallback 退回模式。
确认消息
消费端确认(在确保每个消息被正确消费的情况,此时才可以将broker 删除这个消息)
消费端消息机制默认是自动确认的,只要消息接收到,客户端会自动确认,服务端就会移除这个消息,但是该情况下存在问题:
- 当服务器突然宕机的情况下,此时的消费者接收到消息,但是并没有签收确认,这个时候这条消息就会丢失
所以我们这时候可以将自动消费设置成手动签收消息acknowledge-mode: manual
@Log4j
@Component
public class ConfirmCallbackServiceImpl implements RabbitTemplate.ConfirmCallback {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (!ack) {
log.error("消息发送异常!");
// 发送异常处理
} else {
Map<String, Object> msg = new HashMap<>();
msg.put("correlationData", correlationData);
msg.put("ack", ack);
msg.put("cause", cause);
log.info(JSON.toJSONString(msg));
msg = null;
}
}
}
实现接口 ConfirmCallback ,重写其confirm()方法,方法内有三个参数correlationData、ack、cause。
- correlationData:对象内部只有一个 id 属性,用来表示当前消息的唯一性。
- ack:消息投递到broker 的状态,true表示成功。
- cause:表示投递失败的原因。 但消息被 broker 接收到只能表示已经到达 MQ服务器,并不能保证消息一定会被投递到目标 queue 里。所以接下来需要用到 returnCallback 。
消息退回
如果消息未能投递到目标 queue 里将触发回调 returnCallback ,一旦向 queue 投递消息未成功,这里一般会记录下当前消息的详细投递数据,方便后续做重发或者补偿等操作。
@Log4j
@Component
public class ReturnCallbackServiceImpl implements RabbitTemplate.ReturnCallback {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
Map<String, Object> msg = new HashMap<>();
msg.put("message", message);
msg.put("replyCode", replyCode);
msg.put("replyText", replyText);
msg.put("exchange", exchange);
msg.put("routingKey", routingKey);
log.info(JSON.toJSONString(msg));
msg = null;
}
}
实现接口ReturnCallback,重写 returnedMessage() 方法,方法有五个参数:
- message(消息体)
- replyCode(响应code)
- replyText(响应内容)
- exchange(交换机)
- routingKey(队列)
Work queues(工作模式)
不需要交换机绑定,可以有多个消费者消费消息默认情况下,消息的分发平均分配到每个不同的消费者上* 可以进行“能者多劳”的设置, 即:那个消费者线程处理的快,就可在获得同一队列更多的消息进行处理
rabbitmq有一个默认的Exchange,而每个队列都会默认绑定它。所以work模式使用的是默认Exchange,它是一个direct类型的交换器,如下图所示。
默认Exchange使用得是默认队列名称作为路由key(routing key)
配置work模式 config类,项目运行自动创建队列queue
服务端
开始把全部文件复制一下,后面就只复制方法了,有一些自己封装的东西,去掉就可以了,也可以去主页下载代码
config文件
package top.jingxc.server.config;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
@Component
public class RabbitQueueConfig {
public final static String TEST_WORK_QUEUE = "TEST_WORK_QUEUE";
/**
* work queue 模型
*/
@Bean
Queue workQueue() {
return new Queue(TEST_WORK_QUEUE);
}
}
package top.jingxc.server.service.impl;
import com.alibaba.fastjson.JSON;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import top.jingxc.server.aop.OperationLogger;
import top.jingxc.server.config.ConstantCommon;
import top.jingxc.server.config.ReturnResult;
import top.jingxc.server.config.ReturnResultSuccess;
import top.jingxc.server.service.RabbitMQService;
import java.util.HashMap;
import java.util.Map;
@Service
public class RabbitMQServiceImpl implements RabbitMQService {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private ConfirmCallbackServiceImpl confirmCallbackServiceImpl;
@Autowired
private ReturnCallbackServiceImpl returnCallbackServiceImpl;
@Autowired
private Queue workQueue;
@Override
@OperationLogger
public ReturnResult work() {
for (int i = 0; i < 5; i++) {
String gameId = "217";
String channelId = "2000100000";
Map<String, Object> msg = new HashMap<>();
msg.put("gameId", gameId);
msg.put("channelId", channelId);
msg.put("orderId", i);
CorrelationData correlationData = new CorrelationData(gameId + "-" + channelId + "-" + System.currentTimeMillis());
rabbitTemplate.setConfirmCallback(confirmCallbackServiceImpl);
rabbitTemplate.setReturnCallback(returnCallbackServiceImpl);
rabbitTemplate.convertAndSend(workQueue.getName(), JSON.toJSONString(msg), message -> {
//MessageProperties 封装消息的一些属性,属性比较多,我这里只设置以下消息的持久化,PERSISTENT-持久化 NON_PERSISTENT-非持久化
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return message;
}, correlationData);
}
return ReturnResultSuccess.builder().code(ConstantCommon.RETURN_CODE_200).msg("success").data("")
.count(ConstantCommon.RETURN_COUNT_1).build();
}
}
部分日志:
2023-05-11 16:06:46,224 [http-nio-8096-exec-1] [top.jingxc.server.aop.SysLogAspect] [INFO] - ******进入方法*********
{"args":[],"paramNames":[],"methodName":"work","clazzName":"top.jingxc.server.service.impl.RabbitMQServiceImpl"}
***************************************************************************************
2023-05-11 16:06:46.251 INFO 19206 --- [nio-8096-exec-1] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [127.0.0.1:5672]
2023-05-11 16:06:46.380 INFO 19206 --- [nio-8096-exec-1] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#4db46344:0/SimpleConnection@76e7bd4b [delegate=amqp://root@127.0.0.1:5672/, localPort= 59729]
2023-05-11 16:06:46,604 [http-nio-8096-exec-1] [top.jingxc.server.aop.SysLogAspect] [INFO] - ******返回方法*********
{"ret":{"code":200,"count":1,"data":"","msg":"success"},"methodName":"work","clazzName":"top.jingxc.server.service.impl.RabbitMQServiceImpl"}
***************************************************************************************
2023-05-11 16:06:46,621 [rabbitConnectionFactory1] [top.jingxc.server.service.impl.ConfirmCallbackServiceImpl] [INFO] - {"correlationData":{"future":{"cancelled":false,"done":true},"id":"217-2000100000-1683792406240"},"ack":true}
2023-05-11 16:06:46,630 [rabbitConnectionFactory2] [top.jingxc.server.service.impl.ConfirmCallbackServiceImpl] [INFO] - {"correlationData":{"future":{"cancelled":false,"done":true},"id":"217-2000100000-1683792406493"},"ack":true}
2023-05-11 16:06:46,639 [rabbitConnectionFactory5] [top.jingxc.server.service.impl.ConfirmCallbackServiceImpl] [INFO] - {"correlationData":{"future":{"cancelled":false,"done":true},"id":"217-2000100000-1683792406526"},"ack":true}
2023-05-11 16:06:46,640 [rabbitConnectionFactory4] [top.jingxc.server.service.impl.ConfirmCallbackServiceImpl] [INFO] - {"correlationData":{"future":{"cancelled":false,"done":true},"id":"217-2000100000-1683792406519"},"ack":true}
2023-05-11 16:06:46,643 [rabbitConnectionFactory3] [top.jingxc.server.service.impl.ConfirmCallbackServiceImpl] [INFO] - {"correlationData":{"future":{"cancelled":false,"done":true},"id":"217-2000100000-1683792406501"},"ack":true}
消费端
package top.jingxc.server.service.impl;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import lombok.extern.log4j.Log4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import top.jingxc.server.service.RabbitMQService;
import java.io.IOException;
@Log4j
@Service
public class RabbitMQServiceImpl implements RabbitMQService {
@RabbitListener(queuesToDeclare = @Queue(value = "TEST_WORK_QUEUE"))
@RabbitHandler
public void work(String msg, Channel channel, Message message) throws IOException {
try {
log.info("收到消息:" + msg);
JSONObject jsonObject = JSON.parseObject(msg);
/**
* basicAck:表示成功确认,使用此回执方法后,消息会被rabbitmq broker 删除。
* void basicAck(long deliveryTag, boolean multiple)
* deliveryTag:表示消息投递序号,每次消费消息或者消息重新投递后,deliveryTag都会增加。手动消息确认模式下,我们可以对指定deliveryTag的消息进行ack、nack、reject等操作。
* multiple:是否批量确认,值为 true 则会一次性 ack所有小于当前消息 deliveryTag 的消息。
* */
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
log.info("deliveryTag:" + message.getMessageProperties().getDeliveryTag());
log.info("redelivered" + message.getMessageProperties().getRedelivered());
//TODO 具体业务
} catch (Exception e) {
if (message.getMessageProperties().getRedelivered()) {
log.info("消息已重复处理失败,拒绝再次接收!");
/**
* basicReject:拒绝消息,与basicNack区别在于不能进行批量操作,其他用法很相似。
* deliveryTag:表示消息投递序号。
* requeue:值为 true 消息将重新入队列。
*/
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
// 拒绝消息,requeue=false 表示不再重新入队,如果配置了死信队列则进入死信队列
} else {
log.info("消息即将再次返回队列处理!");
/**
* deliveryTag:表示消息投递序号。
* multiple:是否批量确认。
* requeue:值为 true 消息将重新入队列。
*/
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
// requeue为是否重新回到队列,true重新入队
}
}
}
}
注意
我这里是放在实现层里加了@Service了,如果单独写的时候,需要加上@Component
部分日志
2023-05-11 16:26:01,833 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [top.jingxc.server.service.impl.RabbitMQServiceImpl] [INFO] - 收到消息:{"gameId":"217","orderId":0,"channelId":"2000100000"}
2023-05-11 16:26:01,838 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [top.jingxc.server.service.impl.RabbitMQServiceImpl] [INFO] - deliveryTag:6
2023-05-11 16:26:01,839 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [top.jingxc.server.service.impl.RabbitMQServiceImpl] [INFO] - redeliveredfalse
2023-05-11 16:26:01,840 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [top.jingxc.server.service.impl.RabbitMQServiceImpl] [INFO] - 收到消息:{"gameId":"217","orderId":1,"channelId":"2000100000"}
2023-05-11 16:26:01,841 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [top.jingxc.server.service.impl.RabbitMQServiceImpl] [INFO] - deliveryTag:7
2023-05-11 16:26:01,841 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [top.jingxc.server.service.impl.RabbitMQServiceImpl] [INFO] - redeliveredfalse
2023-05-11 16:26:01,842 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [top.jingxc.server.service.impl.RabbitMQServiceImpl] [INFO] - 收到消息:{"gameId":"217","orderId":2,"channelId":"2000100000"}
2023-05-11 16:26:01,842 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [top.jingxc.server.service.impl.RabbitMQServiceImpl] [INFO] - deliveryTag:8
2023-05-11 16:26:01,842 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [top.jingxc.server.service.impl.RabbitMQServiceImpl] [INFO] - redeliveredfalse
2023-05-11 16:26:01,842 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [top.jingxc.server.service.impl.RabbitMQServiceImpl] [INFO] - 收到消息:{"gameId":"217","orderId":3,"channelId":"2000100000"}
2023-05-11 16:26:01,842 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [top.jingxc.server.service.impl.RabbitMQServiceImpl] [INFO] - deliveryTag:9
2023-05-11 16:26:01,843 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [top.jingxc.server.service.impl.RabbitMQServiceImpl] [INFO] - redeliveredfalse
2023-05-11 16:26:01,843 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [top.jingxc.server.service.impl.RabbitMQServiceImpl] [INFO] - 收到消息:{"gameId":"217","orderId":4,"channelId":"2000100000"}
2023-05-11 16:26:01,843 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [top.jingxc.server.service.impl.RabbitMQServiceImpl] [INFO] - deliveryTag:10
2023-05-11 16:26:01,843 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [top.jingxc.server.service.impl.RabbitMQServiceImpl] [INFO] - redeliveredfalse
Publish/Subscribe(发布订阅模式)
声明交换机 声明队列 队列绑定交换机,一个交换机可以有多个队列queue共同消费同一批消息,分享不共有
FanoutConfig文件
@Component
public class RabbitQueueConfig {
public final static String TEST_WORK_QUEUE = "TEST_WORK_QUEUE";
public final static String TEST_FANOUT_SEND_MESSAGES_QUEUE = "TEST_FANOUT_SEND_MESSAGES_QUEUE";
public final static String TEST_FANOUT_SEND_MESSAGES_EXCHANGE = "TEST_FANOUT_SEND_MESSAGES_EXCHANGE";
/**
* work queue 模型
*/
@Bean
Queue workQueue() {
return new Queue(TEST_WORK_QUEUE);
}
/**
* 声明fanout 队列
*/
@Bean
public Queue fanoutQueue() {
return new Queue(TEST_FANOUT_SEND_MESSAGES_QUEUE);
}
/**
* 声明fanout交换机
*/
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange(TEST_FANOUT_SEND_MESSAGES_EXCHANGE);
}
@Bean
Binding bindingFanoutExchangeFanoutQueue() {
return BindingBuilder.bind(fanoutQueue()).to(fanoutExchange());
}
}
如果有多个队列可以接着绑定
@Bean
Binding bindingFanoutExchangeFanoutQueue2() {
return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
}
服务端
@Override
@OperationLogger
public ReturnResult fanout() {
for (int i = 0; i < 5; i++) {
String gameId = "217";
String channelId = "2000100000";
Map<String, Object> msg = new HashMap<>();
msg.put("gameId", gameId);
msg.put("channelId", channelId);
msg.put("orderId", i);
CorrelationData correlationData = new CorrelationData(gameId + "-" + channelId + "-" + System.currentTimeMillis());
rabbitTemplate.setConfirmCallback(confirmCallbackServiceImpl);
rabbitTemplate.setReturnCallback(returnCallbackServiceImpl);
rabbitTemplate.convertAndSend(rabbitQueueConfig.TEST_FANOUT_SEND_MESSAGES_EXCHANGE,
rabbitQueueConfig.TEST_FANOUT_SEND_MESSAGES_QUEUE, JSON.toJSONString(msg), message -> {
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return message;
}, correlationData);
}
return ReturnResultSuccess.builder().code(ConstantCommon.RETURN_CODE_200).msg("success").data("")
.count(ConstantCommon.RETURN_COUNT_1).build();
}
提示
这里将消息做了字符串转换JSON.toJSONString(msg)
可以直接传对象,只要服务端和消费端一致即可
消费端
@RabbitListener(
bindings = {
@QueueBinding(
value = @Queue(value = "TEST_FANOUT_SEND_MESSAGES_QUEUE"),
exchange = @Exchange(value = "TEST_FANOUT_SEND_MESSAGES_EXCHANGE", type = "fanout")//绑定交换机
)
}
)
public void fanout(String msg, Channel channel, Message message) throws IOException {
try {
log.info("收到消息:" + msg);
JSONObject jsonObject = JSON.parseObject(msg);
/**
* basicAck:表示成功确认,使用此回执方法后,消息会被rabbitmq broker 删除。
* void basicAck(long deliveryTag, boolean multiple)
* deliveryTag:表示消息投递序号,每次消费消息或者消息重新投递后,deliveryTag都会增加。手动消息确认模式下,我们可以对指定deliveryTag的消息进行ack、nack、reject等操作。
* multiple:是否批量确认,值为 true 则会一次性 ack所有小于当前消息 deliveryTag 的消息。
* */
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
log.info("deliveryTag:" + message.getMessageProperties().getDeliveryTag());
log.info("redelivered: " + message.getMessageProperties().getRedelivered());
//TODO 具体业务
} catch (Exception e) {
if (message.getMessageProperties().getRedelivered()) {
System.out.println("消息已重复处理失败,拒绝再次接收!");
// 拒绝消息,requeue=false 表示不再重新入队,如果配置了死信队列则进入死信队列
/**
* basicReject:拒绝消息,与basicNack区别在于不能进行批量操作,其他用法很相似。
* deliveryTag:表示消息投递序号。
* requeue:值为 true 消息将重新入队列。
*/
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
} else {
System.out.println("消息即将再次返回队列处理!");
// requeue为是否重新回到队列,true重新入队
/**
* deliveryTag:表示消息投递序号。
* multiple:是否批量确认。
* requeue:值为 true 消息将重新入队列。
*/
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
}
Routing路由模式
路由模式: 一个生产者,多个消费者。需要设置类型为direct的交换机,交换机和队列进行绑定,并且指定routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列
directConfig文件
package top.jingxc.server.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
@Component
public class RabbitQueueConfig {
public final static String TEST_WORK_QUEUE = "TEST_WORK_QUEUE";
public final static String TEST_FANOUT_SEND_MESSAGES_QUEUE = "TEST_FANOUT_SEND_MESSAGES_QUEUE";
public final static String TEST_FANOUT_SEND_MESSAGES_EXCHANGE = "TEST_FANOUT_SEND_MESSAGES_EXCHANGE";
public final static String TEST_ROUTING_QUEUE_A = "TEST_ROUTING_QUEUE_A";
public final static String TEST_ROUTING_QUEUE_B = "TEST_ROUTING_QUEUE_B";
public final static String TEST_ROUTING_EXCHANGE = "TEST_ROUTING_EXCHANGE";
/**
* work queue 模型
*/
@Bean
Queue workQueue() {
return new Queue(TEST_WORK_QUEUE);
}
/**
* 声明fanout 队列
*/
@Bean
public Queue fanoutQueue() {
return new Queue(TEST_FANOUT_SEND_MESSAGES_QUEUE);
}
@Bean
public Queue routingQueueA() {
return new Queue(TEST_ROUTING_QUEUE_A);
}
@Bean
public Queue routingQueueB() {
return new Queue(TEST_ROUTING_QUEUE_B);
}
/**
* 声明fanout交换机
*/
@Bean("fanoutExchange")
public FanoutExchange fanoutExchange() {
return new FanoutExchange(TEST_FANOUT_SEND_MESSAGES_EXCHANGE);
}
@Bean("routingExchange")
public DirectExchange routingExchange() {
// 创建direct类型交换机,表示与此交换机会将消息发送给 routing_key 完全相同的队列
return new DirectExchange(TEST_ROUTING_EXCHANGE);
}
@Bean
Binding bindingFanoutExchangeFanoutQueue() {
return BindingBuilder.bind(fanoutQueue()).to(fanoutExchange());
}
@Bean
public Binding bindExchangeQueueA(Queue routingQueueA, @Qualifier("routingExchange") DirectExchange routingExchange) {
// 绑定direct交换机,并设置 routing_key 为 routing_second_queue_routing_key
return BindingBuilder.bind(routingQueueA).to(routingExchange).with("217");
}
@Bean
public Binding bindExchangeQueueB(Queue routingQueueB, @Qualifier("routingExchange") DirectExchange routingExchange) {
// 队列二绑定direct交换机,并设置 routing_key 为 routing_second_queue_routing_key
return BindingBuilder.bind(routingQueueB).to(routingExchange).with("218");
}
}
服务端
@Override
@OperationLogger
public ReturnResult routing(String gameId) {
String channelId = "2000100000";
Map<String, Object> msg = new HashMap<>();
msg.put("gameId", gameId);
msg.put("channelId", channelId);
CorrelationData correlationData = new CorrelationData(gameId + "-" + channelId + "-" + System.currentTimeMillis());
rabbitTemplate.setConfirmCallback(confirmCallbackServiceImpl);
rabbitTemplate.setReturnCallback(returnCallbackServiceImpl);
String routingKey = gameId;
rabbitTemplate.convertAndSend(rabbitQueueConfig.TEST_ROUTING_EXCHANGE, routingKey, JSON.toJSONString(msg), message -> {
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return message;
}, correlationData);
return ReturnResultSuccess.builder().code(ConstantCommon.RETURN_CODE_200).msg("success").data("")
.count(ConstantCommon.RETURN_COUNT_1).build();
}
消费端
为了观察效果,多配置了几个消费端
/**
* rouing 路由模式 同一路由下 接收相同的数据。
*/
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue(value = "TEST_ROUTING_QUEUE_A"),
exchange = @Exchange(value = "TEST_ROUTING_EXCHANGE", type = "direct"),
key = {"217"}
)
})
public void routing0(String msg, Channel channel, Message message) throws IOException {
try {
log.info("收到消息:" + msg);
JSONObject jsonObject = JSON.parseObject(msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
log.info("进入routing0" + msg);
//TODO 具体业务
} catch (Exception e) {
if (message.getMessageProperties().getRedelivered()) {
System.out.println("消息已重复处理失败,拒绝再次接收!");
// 拒绝消息,requeue=false 表示不再重新入队,如果配置了死信队列则进入死信队列
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
} else {
System.out.println("消息即将再次返回队列处理!");
// requeue为是否重新回到队列,true重新入队
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
}
/**
* rouing 路由模式 同一路由下 接收相同的数据。
*/
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue(value = "TEST_ROUTING_QUEUE_B"),
exchange = @Exchange(value = "TEST_ROUTING_EXCHANGE", type = "direct"),
key = {"217"}
)
})
public void routing1(String msg, Channel channel, Message message) throws IOException {
try {
log.info("收到消息:" + msg);
JSONObject jsonObject = JSON.parseObject(msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
log.info("进入routing1" + msg);
//TODO 具体业务
} catch (Exception e) {
if (message.getMessageProperties().getRedelivered()) {
System.out.println("消息已重复处理失败,拒绝再次接收!");
// 拒绝消息,requeue=false 表示不再重新入队,如果配置了死信队列则进入死信队列
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
} else {
System.out.println("消息即将再次返回队列处理!");
// requeue为是否重新回到队列,true重新入队
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue(value = "TEST_ROUTING_QUEUE_B"),
exchange = @Exchange(value = "TEST_ROUTING_EXCHANGE", type = "direct"),
key = {"217", "218"}
)
})
public void routing2(String msg, Channel channel, Message message) throws IOException {
try {
log.info("收到消息:" + msg);
JSONObject jsonObject = JSON.parseObject(msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
log.info("进入routing2" + msg);
//TODO 具体业务
} catch (Exception e) {
if (message.getMessageProperties().getRedelivered()) {
System.out.println("消息已重复处理失败,拒绝再次接收!");
// 拒绝消息,requeue=false 表示不再重新入队,如果配置了死信队列则进入死信队列
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
} else {
System.out.println("消息即将再次返回队列处理!");
// requeue为是否重新回到队列,true重新入队
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
}
Topic主题模式
topicConfig文件
package top.jingxc.server.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
@Component
public class RabbitQueueConfig {
public final static String TEST_WORK_QUEUE = "TEST_WORK_QUEUE";
public final static String TEST_FANOUT_SEND_MESSAGES_QUEUE = "TEST_FANOUT_SEND_MESSAGES_QUEUE";
public final static String TEST_FANOUT_SEND_MESSAGES_EXCHANGE = "TEST_FANOUT_SEND_MESSAGES_EXCHANGE";
public final static String TEST_ROUTING_QUEUE_A = "TEST_ROUTING_QUEUE_A";
public final static String TEST_ROUTING_QUEUE_B = "TEST_ROUTING_QUEUE_B";
public final static String TEST_ROUTING_EXCHANGE = "TEST_ROUTING_EXCHANGE";
public final static String TEST_TOPIC_QUEUE_A = "TEST_TOPIC_QUEUE_A";
public final static String TEST_TOPIC_QUEUE_B = "TEST_TOPIC_QUEUE_B";
public final static String TEST_TOPIC_EXCHANGE = "TEST_TOPIC_EXCHANGE";
/**
* work queue 模型
*/
@Bean
Queue workQueue() {
return new Queue(TEST_WORK_QUEUE);
}
/**
* 声明fanout 队列
*/
@Bean
public Queue fanoutQueue() {
return new Queue(TEST_FANOUT_SEND_MESSAGES_QUEUE);
}
@Bean
public Queue routingQueueA() {
return new Queue(TEST_ROUTING_QUEUE_A);
}
@Bean
public Queue routingQueueB() {
return new Queue(TEST_ROUTING_QUEUE_B);
}
@Bean
public Queue topicQueueA() {
return new Queue(TEST_TOPIC_QUEUE_A);
}
@Bean
public Queue topicQueueB() {
return new Queue(TEST_TOPIC_QUEUE_B);
}
/**
* 声明fanout交换机
*/
@Bean("fanoutExchange")
public FanoutExchange fanoutExchange() {
return new FanoutExchange(TEST_FANOUT_SEND_MESSAGES_EXCHANGE);
}
@Bean("routingExchange")
public DirectExchange routingExchange() {
// 创建direct类型交换机,表示与此交换机会将消息发送给 routing_key 完全相同的队列
return new DirectExchange(TEST_ROUTING_EXCHANGE);
}
@Bean("topicExchange")
public TopicExchange topicExchange() {
return new TopicExchange(TEST_TOPIC_EXCHANGE);
}
@Bean
Binding bindingFanoutExchangeFanoutQueue() {
return BindingBuilder.bind(fanoutQueue()).to(fanoutExchange());
}
@Bean
public Binding bindExchangeQueueA(Queue routingQueueA, @Qualifier("routingExchange") DirectExchange routingExchange) {
// 绑定direct交换机,并设置 routing_key 为 routing_second_queue_routing_key
return BindingBuilder.bind(routingQueueA).to(routingExchange).with("217");
}
@Bean
public Binding bindExchangeQueueB(Queue routingQueueB, @Qualifier("routingExchange") DirectExchange routingExchange) {
// 队列二绑定direct交换机,并设置 routing_key 为 routing_second_queue_routing_key
return BindingBuilder.bind(routingQueueB).to(routingExchange).with("218");
}
@Bean
public Binding topicExchangeBindingA(Queue topicQueueA, @Qualifier("topicExchange") TopicExchange topicExchange) {
return BindingBuilder.bind(topicQueueA).to(topicExchange).with("218.#");
}
@Bean
public Binding topicExchangeBindingB(Queue topicQueueB, @Qualifier("topicExchange") TopicExchange topicExchange) {
return BindingBuilder.bind(topicQueueB).to(topicExchange).with("*.2000100006");
}
}
服务端
@Override
@OperationLogger
public ReturnResult topic(String gameId, String channelId) {
Map<String, Object> msg = new HashMap<>();
msg.put("gameId", gameId);
msg.put("channelId", channelId);
CorrelationData correlationData = new CorrelationData(gameId + "-" + channelId + "-" + System.currentTimeMillis());
rabbitTemplate.setConfirmCallback(confirmCallbackServiceImpl);
rabbitTemplate.setReturnCallback(returnCallbackServiceImpl);
String routingKey = gameId + "." + channelId;
rabbitTemplate.convertAndSend(rabbitQueueConfig.TEST_TOPIC_EXCHANGE, routingKey, JSON.toJSONString(msg), message -> {
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return message;
}, correlationData);
return ReturnResultSuccess.builder().code(ConstantCommon.RETURN_CODE_200).msg("success").data("")
.count(ConstantCommon.RETURN_COUNT_1).build();
}
消费端
/**
* Topic 模式star
*/
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue(value = "TEST_TOPIC_QUEUE_A"),
exchange = @Exchange(value = "TEST_TOPIC_EXCHANGE", type = "topic"),//绑定交换机 //默认直连 direct
key = {"218.#"}
)
})
public void topic1(String msg, Channel channel, Message message) throws IOException {
try {
log.info("topic1收到消息:" + msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
//TODO 具体业务
} catch (Exception e) {
if (message.getMessageProperties().getRedelivered()) {
System.out.println("消息已重复处理失败,拒绝再次接收!");
// 拒绝消息,requeue=false 表示不再重新入队,如果配置了死信队列则进入死信队列
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
} else {
System.out.println("消息即将再次返回队列处理!");
// requeue为是否重新回到队列,true重新入队
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue(value = "TEST_TOPIC_QUEUE_B"),
exchange = @Exchange(value = "TEST_TOPIC_EXCHANGE", type = "topic"),//绑定交换机 //默认直连 direct
key = {"*.2000100006"}
)
})
public void topic2(String msg, Channel channel, Message message) throws IOException {
try {
log.info("topic2收到消息:" + msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
//TODO 具体业务
} catch (Exception e) {
if (message.getMessageProperties().getRedelivered()) {
System.out.println("消息已重复处理失败,拒绝再次接收!");
// 拒绝消息,requeue=false 表示不再重新入队,如果配置了死信队列则进入死信队列
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
} else {
System.out.println("消息即将再次返回队列处理!");
// requeue为是否重新回到队列,true重新入队
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
}
消费回执
消费消息有三种回执方法,我们来分析一下每种方法的含义。
basicAck
basicAck:表示成功确认,使用此回执方法后,消息会被rabbitmq broker 删除。
void basicAck(long deliveryTag, boolean multiple)
- deliveryTag:表示消息投递序号,每次消费消息或者消息重新投递后,deliveryTag都会增加。手动消息确认模式下, 我们可以对指定deliveryTag的消息进行ack、nack、reject等操作。
- multiple:是否批量确认,值为 true 则会一次性 ack所有小于当前消息 deliveryTag 的消息。
举个栗子: 假设我先发送三条消息deliveryTag分别是5、6、7,可它们都没有被确认, 当我发第四条消息此时deliveryTag为8,multiple设置为 true,会将5、6、7、8的消息全部进行确认。
basicNack
basicNack :表示失败确认,一般在消费消息业务异常时用到此方法,可以将消息重新投递入队列。
void basicNack(long deliveryTag, boolean multiple, boolean requeue)
- deliveryTag:表示消息投递序号。
- multiple:是否批量确认。
- requeue:值为 true 消息将重新入队列。
basicReject
basicReject:拒绝消息,与basicNack区别在于不能进行批量操作,其他用法很相似。
void basicReject(long deliveryTag, boolean requeue)
- deliveryTag:表示消息投递序号。
- requeue:值为 true 消息将重新入队列。
- 本文作者: 景兴春
- 本文链接: https://www.jingxc.top/spring/rabbit-mq.html
- 版权声明: 本博客所有文章除特别声明外,均采用 Apache License 2.0 许可协议。转载请注明出处!