消息中间件

Jingxc大约 17 分钟java后端java后端RabbitMQ

消息中间件

还是本分区惯例,旨在快速集成,如需了解概念性知识点,请查阅分区知识点-RabbitMq

docker安装RabbitMQ

用于demo测试,在本机安装RabbitMQ,这里使用docker 进行安装

#查看版本
docker search rabbitmq

jingxc@JingxcdeMacBook-Pro ~ % docker search rabbitmq
NAME                                      DESCRIPTION                                     STARS     OFFICIAL   AUTOMATED
rabbitmq                                  RabbitMQ is an open source multi-protocol me…   4763      [OK]       
bitnami/rabbitmq                          Bitnami Docker Image for RabbitMQ               97                   [OK]
masstransit/rabbitmq                                                                      11                   
bitnami/rabbitmq-exporter                                                                 2                    
exozet/rabbitmq-delay-management          deprecated                                      1                    
circleci/rabbitmq-delayed                 https://github.com/circleci/rabbitmq-delayed…   1                    
nasqueron/rabbitmqadmin                   RabbitMQ management plugin CLI tool Lightwei…   1                    [OK]

# 下载合适的版本,这里下载了最新的版本,下载其他的版本可用docker pull rabbitmq:v
docker pull rabbitmq

# 启用
docker run -di --name rabbit-mq -p 5672:5672 -p 15672:15672 rabbitmq
## 由于调试用启用这两个接口就可以了

rabbitmq各端口作用

  • client端通连接端口:5672
  • 页面管理访问端口: 15672
  • 服务间通信端口:25672

启动后访问http://localhost:15672就可以看到管理页面了,

注意

rabbitmq:3.8版本开始,管理插件包含在RabbitMQ发行版中。与其他任何插件一样,必须先启用它,然后才能使用它。这是使用rabbitmq-plugins完成的:

执行此命令即可:rabbitmq-plugins enable rabbitmq_management

插件激活后,无需重新启动节点。

3.问题解决 进入rabbitmq容器

docker exec -it rabbitmq /bin/bash

执行rabbitmq-plugins enable rabbitmq_management

输入用户名密码,默认用户名guest/guest;登录管理界面。

警告

如果在queue页面看不到Message的具体信息

需要做如下配置:

#1. 进入容器内部
jingxc@JingxcdeMacBook-Pro ~ % docker exec -it rabbit-mq /bin/bash

#2.进如容器后进到该目录下
root@b4078072717a:/# cd /etc/rabbitmq/conf.d/
root@b4078072717a:/etc/rabbitmq/conf.d# ls
10-defaults.conf  20-management_agent.disable_metrics_collector.conf

#3. 执行命令
root@b4078072717a:/etc/rabbitmq/conf.d# echo management_agent.disable_metrics_collector = false > management_agent.disable_metrics_collector.conf
root@b4078072717a:/etc/rabbitmq/conf.d# echo management.disable_stats = false > management_agent.disable_metrics_collector.conf
root@b4078072717a:/etc/rabbitmq/conf.d# echo management.enable_queue_totals = true > management_agent.disable_metrics_collector.conf
root@b4078072717a:/etc/rabbitmq/conf.d# exit
exit

#4.重启
jingxc@JingxcdeMacBook-Pro ~ % docker restart rabbit-mq
rabbit-mq
jingxc@JingxcdeMacBook-Pro ~ % 

maven依赖

选取版本,可访问https://mvnrepository.com/open in new window

<!--我这里选用了一个用的人数最多的版本-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>2.7.3</version>
</dependency>

配置文件

MQ创建用户

#进入容器
jingxc@JingxcdeMacBook-Pro ~ % docker exec -it rabbit-mq /bin/bash 
root@b4078072717a:/#

## 创建用户
root@b4078072717a:/# rabbitmqctl add_user root 100uu100UU
Adding user "root" ...
Done. Don't forget to grant the user permissions to some virtual hosts! See 'rabbitmqctl help set_permissions' to learn more.

## 授予虚拟机权限
root@b4078072717a:/# rabbitmqctl set_permissions -p / root ".*" '.*' '.*'
Setting permissions for user "root" in vhost "/" ...
root@b4078072717a:/# exit

消息发送者


spring:
  # rabbit-mq配置
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: root
    password: 100uu100UU
    virtual-host: /
    # 开启回退模式 消息从 exchange 到 queue 投递失败有一个 returnCallback 退回模式。
    publisher-returns: true
    # 开启发布确认 消息从 producer 到 rabbitmq broker有一个 confirmCallback 确认模式。
    publisher-confirm-type: correlated
    listener:
      simple:
        # auto:⾃动模式 (根据侦听器检测是正常返回、还是抛出异常来发出 ack/nack),默认,manual:手动模式,none:自动模式(默认开启)
        acknowledge-mode: auto
        retry:
          #开启重试
          enabled: true
          #最大重试次数
          max-attempts: 3
          #重试间隔
          max-interval: 1000ms

消息接收者


spring:
  # rabbit-mq配置
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: root
    password: 100uu100UU
    listener:
      simple:
        #手动确认消息 ack
        acknowledge-mode: manual
        retry:
          enabled: true
          max-attempts: 3
          max-interval: 1000ms

消息确认

发送消息确认:用来确认生产者 producer 将消息发送到 broker ,broker 上的交换机 exchange 再投递给队列 queue的过程中,消息是否成功投递。

消息从 producer 到 rabbitmq broker有一个 confirmCallback 确认模式。

消息从 exchange 到 queue 投递失败有一个 returnCallback 退回模式。

确认消息

消费端确认(在确保每个消息被正确消费的情况,此时才可以将broker 删除这个消息)

消费端消息机制默认是自动确认的,只要消息接收到,客户端会自动确认,服务端就会移除这个消息,但是该情况下存在问题:

  • 当服务器突然宕机的情况下,此时的消费者接收到消息,但是并没有签收确认,这个时候这条消息就会丢失

所以我们这时候可以将自动消费设置成手动签收消息acknowledge-mode: manual

@Log4j
@Component
public class ConfirmCallbackServiceImpl implements RabbitTemplate.ConfirmCallback {

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {

        if (!ack) {
            log.error("消息发送异常!");
            // 发送异常处理
        } else {
            Map<String, Object> msg = new HashMap<>();
            msg.put("correlationData", correlationData);
            msg.put("ack", ack);
            msg.put("cause", cause);
            log.info(JSON.toJSONString(msg));
            msg = null;
        }
    }
}

实现接口 ConfirmCallback ,重写其confirm()方法,方法内有三个参数correlationData、ack、cause。

  • correlationData:对象内部只有一个 id 属性,用来表示当前消息的唯一性。
  • ack:消息投递到broker 的状态,true表示成功。
  • cause:表示投递失败的原因。 但消息被 broker 接收到只能表示已经到达 MQ服务器,并不能保证消息一定会被投递到目标 queue 里。所以接下来需要用到 returnCallback 。

消息退回


如果消息未能投递到目标 queue 里将触发回调 returnCallback ,一旦向 queue 投递消息未成功,这里一般会记录下当前消息的详细投递数据,方便后续做重发或者补偿等操作。

@Log4j
@Component
public class ReturnCallbackServiceImpl implements RabbitTemplate.ReturnCallback {

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {

        Map<String, Object> msg = new HashMap<>();
        msg.put("message", message);
        msg.put("replyCode", replyCode);
        msg.put("replyText", replyText);
        msg.put("exchange", exchange);
        msg.put("routingKey", routingKey);
        
        log.info(JSON.toJSONString(msg));

        msg = null;

    }
}

实现接口ReturnCallback,重写 returnedMessage() 方法,方法有五个参数:

  • message(消息体)
  • replyCode(响应code)
  • replyText(响应内容)
  • exchange(交换机)
  • routingKey(队列)

Work queues(工作模式)

不需要交换机绑定,可以有多个消费者消费消息默认情况下,消息的分发平均分配到每个不同的消费者上* 可以进行“能者多劳”的设置, 即:那个消费者线程处理的快,就可在获得同一队列更多的消息进行处理

rabbitmq有一个默认的Exchange,而每个队列都会默认绑定它。所以work模式使用的是默认Exchange,它是一个direct类型的交换器,如下图所示。

默认Exchange使用得是默认队列名称作为路由key(routing key)

配置work模式 config类,项目运行自动创建队列queue

服务端

开始把全部文件复制一下,后面就只复制方法了,有一些自己封装的东西,去掉就可以了,也可以去主页下载代码

config文件

package top.jingxc.server.config;

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

@Component
public class RabbitQueueConfig {

    public final static String TEST_WORK_QUEUE = "TEST_WORK_QUEUE";

    /**
     * work queue 模型
     */
    @Bean
    Queue workQueue() {
        return new Queue(TEST_WORK_QUEUE);
    }
}
package top.jingxc.server.service.impl;

import com.alibaba.fastjson.JSON;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import top.jingxc.server.aop.OperationLogger;
import top.jingxc.server.config.ConstantCommon;
import top.jingxc.server.config.ReturnResult;
import top.jingxc.server.config.ReturnResultSuccess;
import top.jingxc.server.service.RabbitMQService;

import java.util.HashMap;
import java.util.Map;

@Service
public class RabbitMQServiceImpl implements RabbitMQService {

    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Autowired
    private ConfirmCallbackServiceImpl confirmCallbackServiceImpl;
    @Autowired
    private ReturnCallbackServiceImpl returnCallbackServiceImpl;
    @Autowired
    private Queue workQueue;

    @Override
    @OperationLogger
    public ReturnResult work() {

        for (int i = 0; i < 5; i++) {

            String gameId = "217";
            String channelId = "2000100000";

            Map<String, Object> msg = new HashMap<>();
            msg.put("gameId", gameId);
            msg.put("channelId", channelId);
            msg.put("orderId", i);

            CorrelationData correlationData = new CorrelationData(gameId + "-" + channelId + "-" + System.currentTimeMillis());
            rabbitTemplate.setConfirmCallback(confirmCallbackServiceImpl);
            rabbitTemplate.setReturnCallback(returnCallbackServiceImpl);

            rabbitTemplate.convertAndSend(workQueue.getName(), JSON.toJSONString(msg), message -> {
                //MessageProperties 封装消息的一些属性,属性比较多,我这里只设置以下消息的持久化,PERSISTENT-持久化  NON_PERSISTENT-非持久化
                message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                return message;
            }, correlationData);
        }
        return ReturnResultSuccess.builder().code(ConstantCommon.RETURN_CODE_200).msg("success").data("")
                .count(ConstantCommon.RETURN_COUNT_1).build();
    }
}

部分日志:

2023-05-11 16:06:46,224  [http-nio-8096-exec-1] [top.jingxc.server.aop.SysLogAspect] [INFO] - ******进入方法*********
{"args":[],"paramNames":[],"methodName":"work","clazzName":"top.jingxc.server.service.impl.RabbitMQServiceImpl"}
***************************************************************************************
2023-05-11 16:06:46.251  INFO 19206 --- [nio-8096-exec-1] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [127.0.0.1:5672]
2023-05-11 16:06:46.380  INFO 19206 --- [nio-8096-exec-1] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#4db46344:0/SimpleConnection@76e7bd4b [delegate=amqp://root@127.0.0.1:5672/, localPort= 59729]
2023-05-11 16:06:46,604  [http-nio-8096-exec-1] [top.jingxc.server.aop.SysLogAspect] [INFO] - ******返回方法*********
{"ret":{"code":200,"count":1,"data":"","msg":"success"},"methodName":"work","clazzName":"top.jingxc.server.service.impl.RabbitMQServiceImpl"}
***************************************************************************************
2023-05-11 16:06:46,621  [rabbitConnectionFactory1] [top.jingxc.server.service.impl.ConfirmCallbackServiceImpl] [INFO] - {"correlationData":{"future":{"cancelled":false,"done":true},"id":"217-2000100000-1683792406240"},"ack":true}
2023-05-11 16:06:46,630  [rabbitConnectionFactory2] [top.jingxc.server.service.impl.ConfirmCallbackServiceImpl] [INFO] - {"correlationData":{"future":{"cancelled":false,"done":true},"id":"217-2000100000-1683792406493"},"ack":true}
2023-05-11 16:06:46,639  [rabbitConnectionFactory5] [top.jingxc.server.service.impl.ConfirmCallbackServiceImpl] [INFO] - {"correlationData":{"future":{"cancelled":false,"done":true},"id":"217-2000100000-1683792406526"},"ack":true}
2023-05-11 16:06:46,640  [rabbitConnectionFactory4] [top.jingxc.server.service.impl.ConfirmCallbackServiceImpl] [INFO] - {"correlationData":{"future":{"cancelled":false,"done":true},"id":"217-2000100000-1683792406519"},"ack":true}
2023-05-11 16:06:46,643  [rabbitConnectionFactory3] [top.jingxc.server.service.impl.ConfirmCallbackServiceImpl] [INFO] - {"correlationData":{"future":{"cancelled":false,"done":true},"id":"217-2000100000-1683792406501"},"ack":true}

消费端


package top.jingxc.server.service.impl;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import lombok.extern.log4j.Log4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import top.jingxc.server.service.RabbitMQService;

import java.io.IOException;

@Log4j
@Service
public class RabbitMQServiceImpl implements RabbitMQService {

    @RabbitListener(queuesToDeclare = @Queue(value = "TEST_WORK_QUEUE"))
    @RabbitHandler
    public void work(String msg, Channel channel, Message message) throws IOException {
        try {
            log.info("收到消息:" + msg);
            JSONObject jsonObject = JSON.parseObject(msg);

            /**
             * basicAck:表示成功确认,使用此回执方法后,消息会被rabbitmq broker 删除。
             * void basicAck(long deliveryTag, boolean multiple)
             * deliveryTag:表示消息投递序号,每次消费消息或者消息重新投递后,deliveryTag都会增加。手动消息确认模式下,我们可以对指定deliveryTag的消息进行ack、nack、reject等操作。
             * multiple:是否批量确认,值为 true 则会一次性 ack所有小于当前消息 deliveryTag 的消息。
             * */
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

            log.info("deliveryTag:" + message.getMessageProperties().getDeliveryTag());
            log.info("redelivered" + message.getMessageProperties().getRedelivered());
            //TODO 具体业务
        } catch (Exception e) {
            if (message.getMessageProperties().getRedelivered()) {
                log.info("消息已重复处理失败,拒绝再次接收!");
                /**
                 * basicReject:拒绝消息,与basicNack区别在于不能进行批量操作,其他用法很相似。
                 * deliveryTag:表示消息投递序号。
                 * requeue:值为 true 消息将重新入队列。
                 */
                channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
                // 拒绝消息,requeue=false 表示不再重新入队,如果配置了死信队列则进入死信队列
            } else {
                log.info("消息即将再次返回队列处理!");
                /**
                 * deliveryTag:表示消息投递序号。
                 * multiple:是否批量确认。
                 * requeue:值为 true 消息将重新入队列。
                 */
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
                // requeue为是否重新回到队列,true重新入队
            }
        }
    }
}

注意

我这里是放在实现层里加了@Service了,如果单独写的时候,需要加上@Component

部分日志

2023-05-11 16:26:01,833  [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [top.jingxc.server.service.impl.RabbitMQServiceImpl] [INFO] - 收到消息:{"gameId":"217","orderId":0,"channelId":"2000100000"}
2023-05-11 16:26:01,838  [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [top.jingxc.server.service.impl.RabbitMQServiceImpl] [INFO] - deliveryTag:6
2023-05-11 16:26:01,839  [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [top.jingxc.server.service.impl.RabbitMQServiceImpl] [INFO] - redeliveredfalse
2023-05-11 16:26:01,840  [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [top.jingxc.server.service.impl.RabbitMQServiceImpl] [INFO] - 收到消息:{"gameId":"217","orderId":1,"channelId":"2000100000"}
2023-05-11 16:26:01,841  [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [top.jingxc.server.service.impl.RabbitMQServiceImpl] [INFO] - deliveryTag:7
2023-05-11 16:26:01,841  [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [top.jingxc.server.service.impl.RabbitMQServiceImpl] [INFO] - redeliveredfalse
2023-05-11 16:26:01,842  [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [top.jingxc.server.service.impl.RabbitMQServiceImpl] [INFO] - 收到消息:{"gameId":"217","orderId":2,"channelId":"2000100000"}
2023-05-11 16:26:01,842  [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [top.jingxc.server.service.impl.RabbitMQServiceImpl] [INFO] - deliveryTag:8
2023-05-11 16:26:01,842  [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [top.jingxc.server.service.impl.RabbitMQServiceImpl] [INFO] - redeliveredfalse
2023-05-11 16:26:01,842  [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [top.jingxc.server.service.impl.RabbitMQServiceImpl] [INFO] - 收到消息:{"gameId":"217","orderId":3,"channelId":"2000100000"}
2023-05-11 16:26:01,842  [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [top.jingxc.server.service.impl.RabbitMQServiceImpl] [INFO] - deliveryTag:9
2023-05-11 16:26:01,843  [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [top.jingxc.server.service.impl.RabbitMQServiceImpl] [INFO] - redeliveredfalse
2023-05-11 16:26:01,843  [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [top.jingxc.server.service.impl.RabbitMQServiceImpl] [INFO] - 收到消息:{"gameId":"217","orderId":4,"channelId":"2000100000"}
2023-05-11 16:26:01,843  [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [top.jingxc.server.service.impl.RabbitMQServiceImpl] [INFO] - deliveryTag:10
2023-05-11 16:26:01,843  [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [top.jingxc.server.service.impl.RabbitMQServiceImpl] [INFO] - redeliveredfalse

Publish/Subscribe(发布订阅模式)

声明交换机 声明队列 队列绑定交换机,一个交换机可以有多个队列queue共同消费同一批消息,分享不共有

FanoutConfig文件


@Component
public class RabbitQueueConfig {

    public final static String TEST_WORK_QUEUE = "TEST_WORK_QUEUE";
    public final static String TEST_FANOUT_SEND_MESSAGES_QUEUE = "TEST_FANOUT_SEND_MESSAGES_QUEUE";
    public final static String TEST_FANOUT_SEND_MESSAGES_EXCHANGE = "TEST_FANOUT_SEND_MESSAGES_EXCHANGE";

    /**
     * work queue 模型
     */
    @Bean
    Queue workQueue() {
        return new Queue(TEST_WORK_QUEUE);
    }

    /**
     * 声明fanout 队列
     */
    @Bean
    public Queue fanoutQueue() {
        return new Queue(TEST_FANOUT_SEND_MESSAGES_QUEUE);
    }

    /**
     * 声明fanout交换机
     */
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange(TEST_FANOUT_SEND_MESSAGES_EXCHANGE);
    }

    @Bean
    Binding bindingFanoutExchangeFanoutQueue() {
        return BindingBuilder.bind(fanoutQueue()).to(fanoutExchange());
    }
}

如果有多个队列可以接着绑定

@Bean
Binding bindingFanoutExchangeFanoutQueue2() {
    return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
}

服务端

@Override
@OperationLogger
public ReturnResult fanout() {
    for (int i = 0; i < 5; i++) {
        String gameId = "217";
        String channelId = "2000100000";

        Map<String, Object> msg = new HashMap<>();
        msg.put("gameId", gameId);
        msg.put("channelId", channelId);
        msg.put("orderId", i);

        CorrelationData correlationData = new CorrelationData(gameId + "-" + channelId + "-" + System.currentTimeMillis());

        rabbitTemplate.setConfirmCallback(confirmCallbackServiceImpl);
        rabbitTemplate.setReturnCallback(returnCallbackServiceImpl);
        rabbitTemplate.convertAndSend(rabbitQueueConfig.TEST_FANOUT_SEND_MESSAGES_EXCHANGE, 
        rabbitQueueConfig.TEST_FANOUT_SEND_MESSAGES_QUEUE, JSON.toJSONString(msg), message -> {
            message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
            return message;
        }, correlationData);
    }
    return ReturnResultSuccess.builder().code(ConstantCommon.RETURN_CODE_200).msg("success").data("")
            .count(ConstantCommon.RETURN_COUNT_1).build();
}

提示

这里将消息做了字符串转换JSON.toJSONString(msg)可以直接传对象,只要服务端和消费端一致即可

消费端


@RabbitListener(
    bindings = {
        @QueueBinding(
                value = @Queue(value = "TEST_FANOUT_SEND_MESSAGES_QUEUE"),
                exchange = @Exchange(value = "TEST_FANOUT_SEND_MESSAGES_EXCHANGE", type = "fanout")//绑定交换机
        )
    }
)
public void fanout(String msg, Channel channel, Message message) throws IOException {
    try {
        log.info("收到消息:" + msg);
        JSONObject jsonObject = JSON.parseObject(msg);

        /**
         * basicAck:表示成功确认,使用此回执方法后,消息会被rabbitmq broker 删除。
         * void basicAck(long deliveryTag, boolean multiple)
         * deliveryTag:表示消息投递序号,每次消费消息或者消息重新投递后,deliveryTag都会增加。手动消息确认模式下,我们可以对指定deliveryTag的消息进行ack、nack、reject等操作。
         * multiple:是否批量确认,值为 true 则会一次性 ack所有小于当前消息 deliveryTag 的消息。
         * */
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        log.info("deliveryTag:" + message.getMessageProperties().getDeliveryTag());
        log.info("redelivered: " + message.getMessageProperties().getRedelivered());
        //TODO 具体业务
    } catch (Exception e) {
        if (message.getMessageProperties().getRedelivered()) {
            System.out.println("消息已重复处理失败,拒绝再次接收!");
            // 拒绝消息,requeue=false 表示不再重新入队,如果配置了死信队列则进入死信队列
            /**
             * basicReject:拒绝消息,与basicNack区别在于不能进行批量操作,其他用法很相似。
             * deliveryTag:表示消息投递序号。
             * requeue:值为 true 消息将重新入队列。
             */
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
        } else {
            System.out.println("消息即将再次返回队列处理!");
            // requeue为是否重新回到队列,true重新入队
            /**
             * deliveryTag:表示消息投递序号。
             * multiple:是否批量确认。
             * requeue:值为 true 消息将重新入队列。
             */
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
        }
    }
}

Routing路由模式

路由模式: 一个生产者,多个消费者。需要设置类型为direct的交换机,交换机和队列进行绑定,并且指定routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列

directConfig文件

package top.jingxc.server.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

@Component
public class RabbitQueueConfig {

    public final static String TEST_WORK_QUEUE = "TEST_WORK_QUEUE";
    public final static String TEST_FANOUT_SEND_MESSAGES_QUEUE = "TEST_FANOUT_SEND_MESSAGES_QUEUE";
    public final static String TEST_FANOUT_SEND_MESSAGES_EXCHANGE = "TEST_FANOUT_SEND_MESSAGES_EXCHANGE";

    public final static String TEST_ROUTING_QUEUE_A = "TEST_ROUTING_QUEUE_A";
    public final static String TEST_ROUTING_QUEUE_B = "TEST_ROUTING_QUEUE_B";
    public final static String TEST_ROUTING_EXCHANGE = "TEST_ROUTING_EXCHANGE";

    /**
     * work queue 模型
     */
    @Bean
    Queue workQueue() {
        return new Queue(TEST_WORK_QUEUE);
    }

    /**
     * 声明fanout 队列
     */
    @Bean
    public Queue fanoutQueue() {
        return new Queue(TEST_FANOUT_SEND_MESSAGES_QUEUE);
    }

    @Bean
    public Queue routingQueueA() {
        return new Queue(TEST_ROUTING_QUEUE_A);
    }

    @Bean
    public Queue routingQueueB() {
        return new Queue(TEST_ROUTING_QUEUE_B);
    }

    /**
     * 声明fanout交换机
     */
    @Bean("fanoutExchange")
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange(TEST_FANOUT_SEND_MESSAGES_EXCHANGE);
    }

    @Bean("routingExchange")
    public DirectExchange routingExchange() {
        // 创建direct类型交换机,表示与此交换机会将消息发送给 routing_key 完全相同的队列
        return new DirectExchange(TEST_ROUTING_EXCHANGE);
    }

    @Bean
    Binding bindingFanoutExchangeFanoutQueue() {
        return BindingBuilder.bind(fanoutQueue()).to(fanoutExchange());
    }

    @Bean
    public Binding bindExchangeQueueA(Queue routingQueueA, @Qualifier("routingExchange") DirectExchange routingExchange) {
        // 绑定direct交换机,并设置 routing_key 为 routing_second_queue_routing_key
        return BindingBuilder.bind(routingQueueA).to(routingExchange).with("217");
    }

    @Bean
    public Binding bindExchangeQueueB(Queue routingQueueB, @Qualifier("routingExchange") DirectExchange routingExchange) {
        // 队列二绑定direct交换机,并设置 routing_key 为 routing_second_queue_routing_key
        return BindingBuilder.bind(routingQueueB).to(routingExchange).with("218");
    }
}

服务端

@Override
@OperationLogger
public ReturnResult routing(String gameId) {

    String channelId = "2000100000";

    Map<String, Object> msg = new HashMap<>();
    msg.put("gameId", gameId);
    msg.put("channelId", channelId);

    CorrelationData correlationData = new CorrelationData(gameId + "-" + channelId + "-" + System.currentTimeMillis());

    rabbitTemplate.setConfirmCallback(confirmCallbackServiceImpl);
    rabbitTemplate.setReturnCallback(returnCallbackServiceImpl);
    String routingKey = gameId;
    rabbitTemplate.convertAndSend(rabbitQueueConfig.TEST_ROUTING_EXCHANGE, routingKey, JSON.toJSONString(msg), message -> {
        message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
        return message;
    }, correlationData);

    return ReturnResultSuccess.builder().code(ConstantCommon.RETURN_CODE_200).msg("success").data("")
            .count(ConstantCommon.RETURN_COUNT_1).build();
}

消费端

为了观察效果,多配置了几个消费端

/**
 * rouing 路由模式 同一路由下 接收相同的数据。
 */

@RabbitListener(bindings = {
        @QueueBinding(
                value = @Queue(value = "TEST_ROUTING_QUEUE_A"),
                exchange = @Exchange(value = "TEST_ROUTING_EXCHANGE", type = "direct"),
                key = {"217"}
        )
})
public void routing0(String msg, Channel channel, Message message) throws IOException {
    try {
        log.info("收到消息:" + msg);
        JSONObject jsonObject = JSON.parseObject(msg);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        log.info("进入routing0" + msg);
        //TODO 具体业务
    } catch (Exception e) {
        if (message.getMessageProperties().getRedelivered()) {
            System.out.println("消息已重复处理失败,拒绝再次接收!");
            // 拒绝消息,requeue=false 表示不再重新入队,如果配置了死信队列则进入死信队列
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
        } else {
            System.out.println("消息即将再次返回队列处理!");
            // requeue为是否重新回到队列,true重新入队
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
        }
    }
}

/**
 * rouing 路由模式 同一路由下 接收相同的数据。
 */
@RabbitListener(bindings = {
        @QueueBinding(
                value = @Queue(value = "TEST_ROUTING_QUEUE_B"),
                exchange = @Exchange(value = "TEST_ROUTING_EXCHANGE", type = "direct"),
                key = {"217"}
        )
})
public void routing1(String msg, Channel channel, Message message) throws IOException {
    try {
        log.info("收到消息:" + msg);
        JSONObject jsonObject = JSON.parseObject(msg);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        log.info("进入routing1" + msg);
        //TODO 具体业务
    } catch (Exception e) {
        if (message.getMessageProperties().getRedelivered()) {
            System.out.println("消息已重复处理失败,拒绝再次接收!");
            // 拒绝消息,requeue=false 表示不再重新入队,如果配置了死信队列则进入死信队列
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
        } else {
            System.out.println("消息即将再次返回队列处理!");
            // requeue为是否重新回到队列,true重新入队
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
        }
    }
}

@RabbitListener(bindings = {
        @QueueBinding(
                value = @Queue(value = "TEST_ROUTING_QUEUE_B"),
                exchange = @Exchange(value = "TEST_ROUTING_EXCHANGE", type = "direct"),
                key = {"217", "218"}
        )
})
public void routing2(String msg, Channel channel, Message message) throws IOException {
    try {
        log.info("收到消息:" + msg);
        JSONObject jsonObject = JSON.parseObject(msg);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        log.info("进入routing2" + msg);
        //TODO 具体业务
    } catch (Exception e) {
        if (message.getMessageProperties().getRedelivered()) {
            System.out.println("消息已重复处理失败,拒绝再次接收!");
            // 拒绝消息,requeue=false 表示不再重新入队,如果配置了死信队列则进入死信队列
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
        } else {
            System.out.println("消息即将再次返回队列处理!");
            // requeue为是否重新回到队列,true重新入队
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
        }
    }
}

Topic主题模式

topicConfig文件

package top.jingxc.server.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

@Component
public class RabbitQueueConfig {

    public final static String TEST_WORK_QUEUE = "TEST_WORK_QUEUE";
    public final static String TEST_FANOUT_SEND_MESSAGES_QUEUE = "TEST_FANOUT_SEND_MESSAGES_QUEUE";
    public final static String TEST_FANOUT_SEND_MESSAGES_EXCHANGE = "TEST_FANOUT_SEND_MESSAGES_EXCHANGE";

    public final static String TEST_ROUTING_QUEUE_A = "TEST_ROUTING_QUEUE_A";
    public final static String TEST_ROUTING_QUEUE_B = "TEST_ROUTING_QUEUE_B";
    public final static String TEST_ROUTING_EXCHANGE = "TEST_ROUTING_EXCHANGE";

    public final static String TEST_TOPIC_QUEUE_A = "TEST_TOPIC_QUEUE_A";
    public final static String TEST_TOPIC_QUEUE_B = "TEST_TOPIC_QUEUE_B";
    public final static String TEST_TOPIC_EXCHANGE = "TEST_TOPIC_EXCHANGE";

    /**
     * work queue 模型
     */
    @Bean
    Queue workQueue() {
        return new Queue(TEST_WORK_QUEUE);
    }

    /**
     * 声明fanout 队列
     */
    @Bean
    public Queue fanoutQueue() {
        return new Queue(TEST_FANOUT_SEND_MESSAGES_QUEUE);
    }

    @Bean
    public Queue routingQueueA() {
        return new Queue(TEST_ROUTING_QUEUE_A);
    }

    @Bean
    public Queue routingQueueB() {
        return new Queue(TEST_ROUTING_QUEUE_B);
    }

    @Bean
    public Queue topicQueueA() {
        return new Queue(TEST_TOPIC_QUEUE_A);
    }

    @Bean
    public Queue topicQueueB() {
        return new Queue(TEST_TOPIC_QUEUE_B);
    }

    /**
     * 声明fanout交换机
     */
    @Bean("fanoutExchange")
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange(TEST_FANOUT_SEND_MESSAGES_EXCHANGE);
    }

    @Bean("routingExchange")
    public DirectExchange routingExchange() {
        // 创建direct类型交换机,表示与此交换机会将消息发送给 routing_key 完全相同的队列
        return new DirectExchange(TEST_ROUTING_EXCHANGE);
    }

    @Bean("topicExchange")
    public TopicExchange topicExchange() {
        return new TopicExchange(TEST_TOPIC_EXCHANGE);
    }

    @Bean
    Binding bindingFanoutExchangeFanoutQueue() {
        return BindingBuilder.bind(fanoutQueue()).to(fanoutExchange());
    }

    @Bean
    public Binding bindExchangeQueueA(Queue routingQueueA, @Qualifier("routingExchange") DirectExchange routingExchange) {
        // 绑定direct交换机,并设置 routing_key 为 routing_second_queue_routing_key
        return BindingBuilder.bind(routingQueueA).to(routingExchange).with("217");
    }

    @Bean
    public Binding bindExchangeQueueB(Queue routingQueueB, @Qualifier("routingExchange") DirectExchange routingExchange) {
        // 队列二绑定direct交换机,并设置 routing_key 为 routing_second_queue_routing_key
        return BindingBuilder.bind(routingQueueB).to(routingExchange).with("218");
    }

    @Bean
    public Binding topicExchangeBindingA(Queue topicQueueA, @Qualifier("topicExchange") TopicExchange topicExchange) {
        return BindingBuilder.bind(topicQueueA).to(topicExchange).with("218.#");
    }

    @Bean
    public Binding topicExchangeBindingB(Queue topicQueueB, @Qualifier("topicExchange") TopicExchange topicExchange) {
        return BindingBuilder.bind(topicQueueB).to(topicExchange).with("*.2000100006");
    }
}

服务端

@Override
@OperationLogger
public ReturnResult topic(String gameId, String channelId) {

    Map<String, Object> msg = new HashMap<>();
    msg.put("gameId", gameId);
    msg.put("channelId", channelId);

    CorrelationData correlationData = new CorrelationData(gameId + "-" + channelId + "-" + System.currentTimeMillis());

    rabbitTemplate.setConfirmCallback(confirmCallbackServiceImpl);
    rabbitTemplate.setReturnCallback(returnCallbackServiceImpl);
    String routingKey = gameId + "." + channelId;
    rabbitTemplate.convertAndSend(rabbitQueueConfig.TEST_TOPIC_EXCHANGE, routingKey, JSON.toJSONString(msg), message -> {
        message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
        return message;
    }, correlationData);

    return ReturnResultSuccess.builder().code(ConstantCommon.RETURN_CODE_200).msg("success").data("")
            .count(ConstantCommon.RETURN_COUNT_1).build();
}

消费端

/**
 * Topic 模式star
 */
@RabbitListener(bindings = {
        @QueueBinding(
                value = @Queue(value = "TEST_TOPIC_QUEUE_A"),
                exchange = @Exchange(value = "TEST_TOPIC_EXCHANGE", type = "topic"),//绑定交换机 //默认直连 direct
                key = {"218.#"}
        )
})
public void topic1(String msg, Channel channel, Message message) throws IOException {
    try {
        log.info("topic1收到消息:" + msg);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        //TODO 具体业务
    } catch (Exception e) {
        if (message.getMessageProperties().getRedelivered()) {
            System.out.println("消息已重复处理失败,拒绝再次接收!");
            // 拒绝消息,requeue=false 表示不再重新入队,如果配置了死信队列则进入死信队列
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
        } else {
            System.out.println("消息即将再次返回队列处理!");
            // requeue为是否重新回到队列,true重新入队
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
        }
    }
}

@RabbitListener(bindings = {
        @QueueBinding(
                value = @Queue(value = "TEST_TOPIC_QUEUE_B"),
                exchange = @Exchange(value = "TEST_TOPIC_EXCHANGE", type = "topic"),//绑定交换机 //默认直连 direct
                key = {"*.2000100006"}
        )
})
public void topic2(String msg, Channel channel, Message message) throws IOException {
    try {
        log.info("topic2收到消息:" + msg);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        //TODO 具体业务
    } catch (Exception e) {
        if (message.getMessageProperties().getRedelivered()) {
            System.out.println("消息已重复处理失败,拒绝再次接收!");
            // 拒绝消息,requeue=false 表示不再重新入队,如果配置了死信队列则进入死信队列
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
        } else {
            System.out.println("消息即将再次返回队列处理!");
            // requeue为是否重新回到队列,true重新入队
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
        }
    }
}

消费回执

消费消息有三种回执方法,我们来分析一下每种方法的含义。

basicAck


basicAck:表示成功确认,使用此回执方法后,消息会被rabbitmq broker 删除。

void basicAck(long deliveryTag, boolean multiple) 
  • deliveryTag:表示消息投递序号,每次消费消息或者消息重新投递后,deliveryTag都会增加。手动消息确认模式下, 我们可以对指定deliveryTag的消息进行ack、nack、reject等操作。
  • multiple:是否批量确认,值为 true 则会一次性 ack所有小于当前消息 deliveryTag 的消息。

举个栗子: 假设我先发送三条消息deliveryTag分别是5、6、7,可它们都没有被确认, 当我发第四条消息此时deliveryTag为8,multiple设置为 true,会将5、6、7、8的消息全部进行确认。

basicNack


basicNack :表示失败确认,一般在消费消息业务异常时用到此方法,可以将消息重新投递入队列。

void basicNack(long deliveryTag, boolean multiple, boolean requeue)
  • deliveryTag:表示消息投递序号。
  • multiple:是否批量确认。
  • requeue:值为 true 消息将重新入队列。

basicReject


basicReject:拒绝消息,与basicNack区别在于不能进行批量操作,其他用法很相似。

void basicReject(long deliveryTag, boolean requeue)
  • deliveryTag:表示消息投递序号。
  • requeue:值为 true 消息将重新入队列。
上次编辑于:
贡献者: Jingxc