栏目分类:
子分类:
返回
文库吧用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
文库吧 > IT > 前沿技术 > 大数据 > 大数据系统

(RabbitMQ 七【完结】)SpringBoot+RabbitMQ实现消息队列延迟功能(使用RabbitMQ延迟插件实现)

(RabbitMQ 七【完结】)SpringBoot+RabbitMQ实现消息队列延迟功能(使用RabbitMQ延迟插件实现)

SpringBoot+RabbitMQ实现消息队列延迟功能

提示:写完文章后,目录可以自动生成,如何生成可参考右边的帮助文档

文章目录
  • SpringBoot+RabbitMQ实现消息队列延迟功能
  • 前言
  • 一、配置文件
  • 二、创建发送者
  • 三、创建接收者
  • 四、创建测试用例
  • 总结


前言

要实现这个功能,第一种方法是使用RabbitMQ的消息队列延迟功能,需要采用官方提供的插件“rabbit_delayed_message_exchange”来实现,要注意的是使用延迟队列插件需要RabbitMQ的版本在3.5.8以上。第二种方法是使用RabbitMQ的“死信“”功能,“死信”在创建Queue队列时,要声明“死信”队列,队列中的消息到一定时间没有被消费就会变成死信转发到死信相应的Exchange或Queue中


提示:安装延迟插件可以去参考这篇博客https://blog.csdn.net/qq_36025814/article/details/106647467

一、配置文件

在配置文件中,我们需要创建一个消息延迟队列、相应的交换机以及绑定。这里使用的交换机类型不是DirectExchange(直连交换机),而是CustomExchange(自定义交换机)。同时CustomExchange的类型(传入的第二个参数)必须是“x-delayed-message”。

具体代码如下:

import java.util.HashMap;
import java.util.Map;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitmqConfigYanchi {

	@Bean(name = "delay_queue_1")
    public Queue queue() {
        Queue queue = new Queue("delay_queue_1", true);
        return queue;
    }
	
	@Bean(name = "delay.1")
	
	public CustomExchange delayExchange() {
		Map args = new HashMap();
		args.put("x-delayed-type", "direct");
		
		return new CustomExchange("delayed_exchange", "x-delayed-message",true,false,args);
	}
	
	@Bean
    public Binding binding() {
        return BindingBuilder.bind(queue()).to(delayExchange()).with("delay_queue_1").noargs();
    }
}
二、创建发送者

这里类上使用的注解与前面有些不同,没有使用@Component,而是使用@Service;此外自动装配的也不是Spring自带的amqpTemplate模板了,而是使用RabbitTemplate。

使用消息延迟功能的只要实现在重写new MessagePostProcessor()中的postProcessMessage()方法。

具体代码如下:

import java.text.SimpleDateFormat;
import java.util.Date;

import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class CustomSender {

	@Autowired
	private RabbitTemplate rabbitTemplate;
	
	public void sendMsg(String queueName, String msg) {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        System.out.println("消息队列延迟功能的发送者:" + sdf.format(new Date()));
        rabbitTemplate.convertAndSend("delayed_exchange", queueName, msg, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                //消息延迟5000毫秒
                message.getMessageProperties().setHeader("x-delay", 5000);
                return message;
            }
        });
    }
}
三、创建接收者

这里也有与之前不同的地方,首先类上的@RabbitListener注解需要用在方法上,而方法上的@RabbitHandler注解则不再需要。

具体代码如下:

import java.text.SimpleDateFormat;
import java.util.Date;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class CustomReceiver {

	@RabbitListener(queues = "delay_queue_1")
	public void recevie(String message) {
		SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
		System.out.println("--------------------------------------------------");
        System.out.println(sdf.format(new Date())+" - "+message);
        System.out.println("消息队列延迟功能的接收者: 已收到延迟消息!");
	}
}
四、创建测试用例

只需要自动装配发送者类,之后在方法中调用发送方法即可。

具体代码如下:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/yanchi")
public class CustomController {

	@Autowired
	private CustomSender customSender;
	
	@GetMapping("/yanchi1")
	public void send() {
		
		customSender.sendMsg("delay_queue_1", "消息延迟功能");
	}
}

总结

运行以上测试用例,我们会发现所有发送的消息都将发送时间的5秒之后被接收者接收。

运用到Web项目中的一些思考:拿“订单超时未支付,取消订单”来举例吧。

首先,用户点击提交订单后(此时用户还未付款),后端做两个工作,1:是把订单信息保存到数据库中,此时订单的状态为未支付;2:是把订单的一些信息(比如订单号)发送到延迟队列中。
然后,等到了我们设置的延迟时间后,接收者的工作如下:去数据库读取订单的状态,如果订单的状态为“未支付”,则向用户发送消息“订单超时未支付,取消订单”;如果订单的状态为“已支付”,则不用处理。

补充,订单状态的改变由用户的支付操作进行更改。

转载请注明:文章转载自 www.wk8.com.cn
本文地址:https://www.wk8.com.cn/it/280063.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 wk8.com.cn

ICP备案号:晋ICP备2021003244-6号