提示:写完文章后,目录可以自动生成,如何生成可参考右边的帮助文档
文章目录- SpringBoot+RabbitMQ实现消息队列延迟功能
- 前言
- 一、配置文件
- 二、创建发送者
- 三、创建接收者
- 四、创建测试用例
- 总结
前言
要实现这个功能,第一种方法是使用RabbitMQ的消息队列延迟功能,需要采用官方提供的插件“rabbit_delayed_message_exchange”来实现,要注意的是使用延迟队列插件需要RabbitMQ的版本在3.5.8以上。第二种方法是使用RabbitMQ的“死信“”功能,“死信”在创建Queue队列时,要声明“死信”队列,队列中的消息到一定时间没有被消费就会变成死信转发到死信相应的Exchange或Queue中
提示:安装延迟插件可以去参考这篇博客https://blog.csdn.net/qq_36025814/article/details/106647467
一、配置文件在配置文件中,我们需要创建一个消息延迟队列、相应的交换机以及绑定。这里使用的交换机类型不是DirectExchange(直连交换机),而是CustomExchange(自定义交换机)。同时CustomExchange的类型(传入的第二个参数)必须是“x-delayed-message”。
具体代码如下:
import java.util.HashMap; import java.util.Map; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.CustomExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitmqConfigYanchi { @Bean(name = "delay_queue_1") public Queue queue() { Queue queue = new Queue("delay_queue_1", true); return queue; } @Bean(name = "delay.1") public CustomExchange delayExchange() { Map二、创建发送者args = new HashMap (); args.put("x-delayed-type", "direct"); return new CustomExchange("delayed_exchange", "x-delayed-message",true,false,args); } @Bean public Binding binding() { return BindingBuilder.bind(queue()).to(delayExchange()).with("delay_queue_1").noargs(); } }
这里类上使用的注解与前面有些不同,没有使用@Component,而是使用@Service;此外自动装配的也不是Spring自带的amqpTemplate模板了,而是使用RabbitTemplate。
使用消息延迟功能的只要实现在重写new MessagePostProcessor()中的postProcessMessage()方法。
具体代码如下:
import java.text.SimpleDateFormat; import java.util.Date; import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service public class CustomSender { @Autowired private RabbitTemplate rabbitTemplate; public void sendMsg(String queueName, String msg) { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); System.out.println("消息队列延迟功能的发送者:" + sdf.format(new Date())); rabbitTemplate.convertAndSend("delayed_exchange", queueName, msg, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { //消息延迟5000毫秒 message.getMessageProperties().setHeader("x-delay", 5000); return message; } }); } }三、创建接收者
这里也有与之前不同的地方,首先类上的@RabbitListener注解需要用在方法上,而方法上的@RabbitHandler注解则不再需要。
具体代码如下:
import java.text.SimpleDateFormat; import java.util.Date; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class CustomReceiver { @RabbitListener(queues = "delay_queue_1") public void recevie(String message) { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); System.out.println("--------------------------------------------------"); System.out.println(sdf.format(new Date())+" - "+message); System.out.println("消息队列延迟功能的接收者: 已收到延迟消息!"); } }四、创建测试用例
只需要自动装配发送者类,之后在方法中调用发送方法即可。
具体代码如下:
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @RestController @RequestMapping("/yanchi") public class CustomController { @Autowired private CustomSender customSender; @GetMapping("/yanchi1") public void send() { customSender.sendMsg("delay_queue_1", "消息延迟功能"); } }
总结
运行以上测试用例,我们会发现所有发送的消息都将发送时间的5秒之后被接收者接收。
运用到Web项目中的一些思考:拿“订单超时未支付,取消订单”来举例吧。
首先,用户点击提交订单后(此时用户还未付款),后端做两个工作,1:是把订单信息保存到数据库中,此时订单的状态为未支付;2:是把订单的一些信息(比如订单号)发送到延迟队列中。
然后,等到了我们设置的延迟时间后,接收者的工作如下:去数据库读取订单的状态,如果订单的状态为“未支付”,则向用户发送消息“订单超时未支付,取消订单”;如果订单的状态为“已支付”,则不用处理。
补充,订单状态的改变由用户的支付操作进行更改。