RabbitMQ 之 延迟队列

目录

​编辑一、延迟队列概念

二、延迟队列使用场景

三、整合 SpringBoot

1、创建项目

2、添加依赖

3、修改配置文件

4、添加 Swagger 配置类

四、队列 TTL

1、代码架构图

2、配置文件代码类

3、生产者

4、消费者

5、结果展示

五、延时队列优化

1、代码架构图

2、配置文件

3、生产者​编辑

4、结果展示

六、RabbitMQ 插件实现延迟队列

1、安装延时队列插件

2、代码架构图

 3、配置文件类代码

4、生产者

5、消费者

6、结果展示


一、延迟队列概念

延时队列 , 队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望
在指定时间到了以后或之前取出和处理,简单来说, 延时队列就是用来存放需要在指定时间被处理的元素的队列。

二、延迟队列使用场景

1.订单在十分钟之内未支付则自动取消
2.新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
3.用户注册成功后,如果三天内没有登陆则进行短信提醒。
4.用户发起退款,如果三天内没有得到处理则通知相关运营人员。
5.预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议

这些场景都有一个特点,需要在某个事件发生之后或者之前的指定时间点完成某一项任务,如:
发生订单生成事件,在十分钟之后检查该订单支付状态,然后将未支付的订单进行关闭;看起来似乎使用定时任务,一直轮询数据,每秒查一次,取出需要被处理的数据,然后处理不就完事了吗?

如果数据量比较少,确实可以这样做,比如:对于“如果账单一周内未支付则进行自动结算”这样的需求,如果对于时间不是严格限制,而是宽松意义上的一周,那么每天晚上跑个定时任务检查一下所有未支付的账单,确实也是一个可行的方案。

但对于数据量比较大,并且时效性较强的场景,如:“订单十分钟内未支付则关闭“,短期内未支付的订单数据可能会有很多,活动期间甚至会达到百万甚至千万级别,对这么庞大的数据量仍旧使用轮询的方式显然是不可取的,很可能在一秒内无法完成所有订单的检查,同时会给数据库带来很大压力,无法满足业务要求而且性能低下。


三、整合 SpringBoot

1、创建项目

2、添加依赖

<dependencies>
 <!--RabbitMQ 依赖-->
 <dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-amqp</artifactId>
 </dependency>
 <dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-web</artifactId>
 </dependency>
 <dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-test</artifactId>
 <scope>test</scope>
 </dependency>
 <dependency>
 <groupId>com.alibaba</groupId>
 <artifactId>fastjson</artifactId>
 <version>1.2.47</version>
 </dependency>
 <dependency>
 <groupId>org.projectlombok</groupId>
 <artifactId>lombok</artifactId>
 </dependency>
 <!--swagger-->
 <dependency>
 <groupId>io.springfox</groupId>
 <artifactId>springfox-swagger2</artifactId>
 <version>2.9.2</version>
 </dependency>
 <dependency>
 <groupId>io.springfox</groupId>
 <artifactId>springfox-swagger-ui</artifactId>
 <version>2.9.2</version>
 </dependency>
 <!--RabbitMQ 测试依赖-->
 <dependency>
 <groupId>org.springframework.amqp</groupId>
 <artifactId>spring-rabbit-test</artifactId>
 <scope>test</scope>
 </dependency>
</dependencies>

3、修改配置文件

spring.rabbitmq.host=111.229.153.16
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123
spring.mvc.pathmatch.matching-strategy=ant_path_matcher

4、添加 Swagger 配置类

@Configuration
@EnableSwagger2
public class SwaggerConfig {
    @Bean
    public Docket webApiConfig(){
        return new Docket(DocumentationType.SWAGGER_2)
                .groupName("webApi")
                .apiInfo(webApiInfo())
                .select()
                .build();
    }
    private ApiInfo webApiInfo(){
        return new ApiInfoBuilder()
                .title("rabbitmq 接口文档")
                .description("本文档描述了 rabbitmq 微服务接口定义")
                .version("1.0")
                .contact(new Contact("馒头警告", "www.baidu.com","2714858327@qq.com"))
                .build();
    }
}

四、队列 TTL

1、代码架构图

创建两个队列 QA 和 QB,两者队列 TTL 分别设置为 10S 和 40S,然后在创建一个交换机 X 和死信交换机 Y,它们的类型都是 direct,创建一个死信队列 QD,它们的绑定关系如下:


2、配置文件代码类

// TTL 队列,配置文件类代码
@Configuration
public class TTLQueueConfig {

    // 普通交换机的名称
    public static final String X_EXCHANGE = "X";
    // 死信交换机名称
    public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
    // 普通队列名称
    public static final String QUEUE_A = "QA";
    public static final String QUEUE_B = "QB";
    // 死信队列名称
    public static final String DEAD_LETTER_QUEUE_D = "QD";

    // 声明 X 交换机
    @Bean(value = "xExchange") // 相当于别名
    public DirectExchange xExchange(){
        return new DirectExchange(X_EXCHANGE);
    }
    // 声明 Y 交换机
    @Bean(value = "yExchange") // 相当于别名
    public DirectExchange yExchange(){
        return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
    }

    // 声明普通队列 TTL 为 10s
    @Bean(value = "queueA") // 相当于别名
    public Queue queueA(){
        Map<String, Object> arguments = new HashMap<>(3);
        // 设置死信交换机
        arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
        // 设置死信 RoutingKey
        arguments.put("x-dead-letter-routing-key","YD");
        // 设置 TTL
        arguments.put("x-message-ttl",10000);
        return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();
    }

    // 声明普通队列 TTL 为 10s
    @Bean(value = "queueB") // 相当于别名
    public Queue queueB(){
        Map<String, Object> arguments = new HashMap<>(3);
        // 设置死信交换机
        arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
        // 设置死信 RoutingKey
        arguments.put("x-dead-letter-routing-key","YD");
        // 设置 TTL
        arguments.put("x-message-ttl",40000);
        return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();
    }

    // 声明死信队列
    @Bean(value = "queueD")
    public Queue queueD(){
        return QueueBuilder.durable(DEAD_LETTER_QUEUE_D).build();
    }

    // 绑定
    @Bean
    public Binding queueABindingX(@Qualifier("queueA") Queue queueA ,
                                  @Qualifier("xExchange") DirectExchange xExchange){
        return BindingBuilder.bind(queueA).to(xExchange).with("XA");
    }
    // 绑定
    @Bean
    public Binding queueBBindingX(@Qualifier("queueB") Queue queueB ,
                                  @Qualifier("xExchange") DirectExchange xExchange) {
        return BindingBuilder.bind(queueB).to(xExchange).with("XB");
    }
    // 绑定
    @Bean
    public Binding queueDBindingY(@Qualifier("queueD") Queue queueD ,
                                  @Qualifier("yExchange") DirectExchange yExchange){
        return BindingBuilder.bind(queueD).to(yExchange).with("YD");
    }

}

3、生产者

// 发送延迟消息
@Slf4j
@Configuration
@RequestMapping("/ttl")
public class SendMessageController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    // 开始发消息
    @GetMapping("/sendmsg/{message}")
    public void sendMsg(@PathVariable String message){
        log.info("当前时间:{},发送一条信息给两个 TTL 队列:{}", new Date().toString(),message);
        rabbitTemplate.convertAndSend("X","XA","消息来自 TTL 为 10s 的队列" + message);
        rabbitTemplate.convertAndSend("X","XB","消息来自 TTL 为 40s 的队列" + message);
    }
}

4、消费者

// 队列 TTL
@Slf4j
@Component
public class DeadLetterQueueConsumer {

    // 接收消息
    @RabbitListener(queues = "QD")
    public void receiveD(Message message, Channel channel){
        String msg = new String(message.getBody());
        log.info("当前时间:{},收到死信队列的消息:{}",new Date().toString(),msg);
    }
}

5、结果展示


五、延时队列优化

第一条消息在 10S 后变成了死信消息,然后被消费者消费掉,第二条消息在 40S 之后变成了死信消息,然后被消费掉,这样一个延时队列就打造完成了。

不过,如果这样使用的话,岂不是每增加一个新的时间需求,就要新增一个队列,这里只有 10S 和 40S两个时间选项,如果需要一个小时后处理,那么就需要增加 TTL 为一个小时的队列,如果是预定会议室然后提前通知这样的场景,岂不是要增加无数个队列才能满足需求?


1、代码架构图

在这里新增了一个队列 QC,绑定关系如下,该队列不设置 TTL 时间


2、配置文件

 


3、生产者


4、结果展示

看起来似乎没什么问题,但是在最开始的时候,就介绍过如果使用在消息属性上设置 TTL 的方式,消息可能并不会按时“死亡“

因为 RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。


六、RabbitMQ 插件实现延迟队列

1、安装延时队列插件

大家可以参考网上的教程自行去下载安装,这里就不再进行描述了

如果安装好了,就会有一个叫做  x-delayed-message 的类型

一旦装完插件之后,消息延迟的位置就换到交换机了,相当于生产者发消息,这个消息停留在交换机这个位置,再将消息发给队列


2、代码架构图

在这里新增了一个队列 delayed.queue,一个自定义交换机 delayed.exchange,绑定关系如下:

一个生产者,要走延迟的交换机,延迟的交换机根据 RoutingKey ,路由到延迟的队列,而延迟的位置是在交换机的位置,队列被消费者所消费


 3、配置文件类代码

在我们自定义的交换机中,这是一种新的交换类型,该类型消息支持延迟投递机制 消息传递后并
不会立即投递到目标队列中,而是存储在 mnesia(一个分布式数据系统)表中,当达到投递时间时,才投递到目标队列中。

@Configuration
public class DelayedQueueConfig {

    // 交换机
    public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
    // 队列
    public static final String DELAYED_QUEUE_NAME = "delayed.queue";
    // RoutingKey
    public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";

    // 声明交换机 基于插件的
    @Bean
    public CustomExchange delayedExchange(){
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-delayed-type","direct");
        return new CustomExchange(DELAYED_EXCHANGE_NAME,"x-delayed-message",
                true,false,arguments);
    }

    // 声明队列
    @Bean
    public Queue delayedQueue(){
        return new Queue(DELAYED_QUEUE_NAME);
    }

    // 绑定
    @Bean
    public Binding delayedQueueBindingDelayedExchange(
            @Qualifier("delayedQueue") Queue delayedQueue,
            @Qualifier("delayedExchange")CustomExchange delayedExchange){
        return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
    }
}

4、生产者

    // 开始发消息  基于插件的  消息及延迟的时间
    @GetMapping("/senddelaymsg/{message}/{delaytime}")
    public void sendMsg(@PathVariable String message,@PathVariable Integer delaytime){
        log.info("当前时间:{},发送一条时长{}毫秒信息给延迟队列delayed.queue:{}",
                new Date().toString(),delaytime,message);
        rabbitTemplate.convertAndSend("delayed.exchange","delayed.routingkey",message, msg ->{
            // 设置发送消息时候的延迟时长
            msg.getMessageProperties().setDelay(delaytime);
            return msg;
        });
    }

5、消费者

// 消费者 基于插件的延迟消息
@Slf4j
@Component
public class DelayQueueConsumer {
    // 监听消息
    @RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUE_NAME)
    public void receiverDelayQueue(Message message){
        String msg = new String(message.getBody());
        log.info("当前时间:{},收到延迟队列的消息:{}",new Date().toString(),msg);
    }
}

6、结果展示

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/764684.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Android 11.0 SettingsProvider 源码分析

文章目录 一、SettingsProvider 的概述二、SettingsProvider 的启动流程三、对 SettingsProvider 进行操作方法四、客制化示例 一、SettingsProvider 的概述 SettingsProvider 是一个为 Android 系统设置提供数据共享的 Provider&#xff0c;它包含全局、安全和系统级别的用户…

AI大模型对话(上下文)缓存能力

互联网应用中&#xff0c;为了提高数据获取的即时性&#xff0c;产生了各种分布式缓存组件&#xff0c;比如Redis、Memcached等等。 大模型时代&#xff0c;除非是免费模型&#xff0c;否则每次对话都会花费金钱来进行对话&#xff0c;对话是不是也可以参照缓存的做法来提高命…

207.贪心算法:最大子数组和(力扣)

代码展示 class Solution { public:int maxSubArray(vector<int>& nums) {int result INT_MIN; // 初始化结果为最小可能的整数int sum 0; // 初始化当前子数组和为0// 遍历数组中的每一个元素for (int i 0; i < nums.size(); i){sum nums[i]; //…

PHP电商系统开发指南最佳实践

电子商务系统开发的最佳实践包括&#xff1a;数据库设计&#xff1a;选择适合关系型数据库&#xff0c;优化数据结构&#xff0c;考虑表分区&#xff1b;安全&#xff1a;加密数据&#xff0c;防止 sql 注入&#xff0c;处理会话管理&#xff1b;用户界面&#xff1a;遵循 ux 原…

【仪器仪表】 矢量网络分析仪 Vector Network Analyzer

主要功能&#xff1a; 测量S参数&#xff1a; S11&#xff08;输入反射系数&#xff09;&#xff1a;测量输入端口的反射。S21&#xff08;正向传输系数&#xff09;&#xff1a;测量从输入端口到输出端口的传输。S12&#xff08;反向传输系数&#xff09;&#xff1a;测量从输…

MobPush 第三方插件:Uni-app

插件集成 访问MobPush插件、MobCommon插件点击购买并添加到项目当中。在uniapp的“manifest.json”中选择“app原生插件配置”&#xff0c;点击勾选上方添加的两个插件完成上述两步后请务必先打自定义基座哦&#xff0c;否则SDK代码无法生效&#xff01; iOS平台相关配置 添…

软考《信息系统运行管理员》-2.2 信息系统运维的组织

2.2 信息系统运维的组织 信息系统运维的任务 数据资源管理 数据收集、数据校验、数据录入、数据处理 软件资源管理 采购、保存、相关文档保管、分发、安装、支持、评价、培训 硬件资源管理 检查、维护、故障处理、更新、修复、扩充 系统安全管理 可用性、完整性、保密性、可控…

【C语言】typedef 关键字

在C语言中&#xff0c;typedef关键字用于给现有的数据类型起一个新的名字。它在提高代码可读性、简化复杂类型声明、增强可维护性方面非常有用。typedef通常用于定义结构体、指针、函数指针以及其他复杂类型。 基本用法 typedef int MyInt; MyInt x 10;在这个例子中&#xf…

ROS2 RQT

1. RQT是什么 RQT是一个GUI框架&#xff0c;通过插件的方式实现了各种各样的界面工具。 强行解读下&#xff1a;RQT就像插座&#xff0c;任何电器只要符合插座的型号就可以插上去工作。 2.选择插件 这里我们可以选择现有的几个RQT插件来试一试&#xff0c;可以看到和话题、参…

从深度学习到音乐创作:AI如何重新定义音乐行业

&#x1f4d1;引言 近一个月来&#xff0c;随着几款音乐大模型的轮番上线&#xff0c;AI在音乐产业的角色迅速扩大。这些模型不仅将音乐创作的门槛降至前所未有的低点&#xff0c;还引发了一场关于AI是否会彻底颠覆音乐行业的激烈讨论。从初期的兴奋到现在的理性审视&#xff0…

【FPGA 学习与实践】<初阶> 项目周计划

第1-2周&#xff1a;基础项目 - 4位加法器和计数器 目标&#xff1a;掌握Verilog基本语法和模块设计。 第1周&#xff1a; 学习Verilog的基本语法和结构&#xff08;模块、端口、数据类型&#xff09;。设计并实现一个4位加法器。编写测试平台&#xff08;Testbench&#xff0…

吴晓波:企业出海的最佳时间窗口只有5-10年,中国企业如何把握出海机遇?

鼓励企业参与绿色“一带一路”建设&#xff0c;带动先进的环保技术、装备、产能走出去。 出海计划&#xff01;马来西亚水环境项目国际考察暨2024中马水务合作论坛

控制台厂商配额查询

概述 厂商推送限制 每个厂商通道都有对应的厂商配额和 QPS 限制&#xff0c;当请求超过限制且已配置厂商回执时&#xff0c;MobPush会采取以下措施&#xff1a; 当开发者推送请求超过厂商配额时&#xff0c;MobPush将通过自有通道进行消息下发。当开发者推送请求超过厂商 QPS…

LLM大模型工程师面试经验宝典--进阶版2(2024.7月最新)

目录 1 大模型怎么评测&#xff1f; 2 大模型的honest原则是如何实现的&#xff1f;模型如何判断回答 的知识是训练过的已知的知识&#xff0c;怎么训练这种能力&#xff1f; 3 如何衡量大模型水平&#xff1f; 4 大模型评估方法 有哪些&#xff1f; 5 大模型评估工具 有哪…

Linux——查找文件-find(详细)

查找文件-find 作用 - 按照文件名、大小、时间、权限、类型、所属者、所属组来搜索文件 格式 find 查找路径 查找条件 具体条件 操作 注意 - find命令默认的操作是print输出 - find是检索文件的&#xff0c;grep是过滤文件中字符串 参数 参数 …

S7-1500PLC通过工艺对象实现V90总线伺服定位控制(105报文)

S7-1500PLC通过工艺对象实现V90总线伺服定位控制,伺服驱动器工作在速度模式,S7-1500PLC工作在位置模式,具体控制原理框图,可以参考下面文章链接: 1、S7-1200PLC和V90总线伺服位置控制 S7-1200PLC和V90总线伺服通过工艺对象实现定位控制(标准报文3应用)_v90伺服 报文3 设…

Python 获取字典中的值(八种方法)

Python 字典(dictionary)是一种可变容器模型&#xff0c;可以存储任意数量的任意类型的数据。字典通常用于存储键值对&#xff0c;每个元素由一个键&#xff08;key&#xff09;和一个值(value&#xff09;组成&#xff0c;键和值之间用冒号分隔。 以下是 Python 字典取值的几…

创新校园服务模式 跑腿小程序平台源码构建与实践 前后端分离 带完整的安装代码包以及部署教程

系统概述 本项目是一个集任务发布、接单、支付、评价于一体的跑腿服务小程序平台&#xff0c;专为高校校园设计。系统采用前后端分离架构&#xff0c;前端负责用户界面展示和交互逻辑&#xff0c;后端处理业务逻辑、数据存取等&#xff0c;两者通过API接口进行通信&#xff0c…

MySQL数据核心技术:理解主键与外键的关系与作用

在进行数据库设计时&#xff0c;合理的添加主键和外键能有效保障数据的完整性和一致性&#xff0c;使得数据管理更加科学高效。本文将详细介绍MySQL中主键和外键的基本概念、它们之间的关系、作用及一些高级知识点。 一、主键&#xff08;Primary Key&#xff09;的概念 主键…

Bootstrap 缩略图

Bootstrap 缩略图 引言 Bootstrap 是一个流行的前端框架,它提供了一套丰富的组件和工具,帮助开发者快速构建响应式和移动优先的网页。缩略图(Thumbnails)是 Bootstrap 中的一种组件,用于展示图片或其他媒体内容,通常与标题和文本描述一起使用,形成一个整洁的布局。本文…