- 1.引入 spring-boot-starter-amqp
- 2. application.properties配置
- 3.确认抵达
- 4.实现
2. application.properties配置org.springframework.boot spring-boot-starter-amqp
# RabbitMQ配置 spring.rabbitmq.host=192.168.56.10 spring.rabbitmq.port=5672 # 虚拟主机配置**********这些是消息抵达配置**** spring.rabbitmq.virtual-host=/ # 开启发送端消息抵达Broker确认 spring.rabbitmq.publisher-confirms=true # 开启发送端消息抵达Queue确认 spring.rabbitmq.publisher-returns=true # 只要消息抵达Queue,就会异步发送优先回调returnfirm spring.rabbitmq.template.mandatory=true # 手动ack消息,不使用默认的消费端确认 spring.rabbitmq.listener.simple.acknowledge-mode=manual3.确认抵达
保证消息不丢失,可靠抵达,可以使用事务消息,性能下降250倍,为此引入确认 机制
-
publisher confirmCallback 确认模式
-
publisher returnCallback 未投递到 queue 退回模式
-
consumer ack机制
-
spring.rabbitmq.publisher-confirms=true
在创建 connectionFactory 的时候设置 PublisherConfirms(true) 选项,开启
confirmcallback 。
CorrelationData:用来表示当前消息唯一性。
消息只要被 broker 接收到就会执行 /confirm/iCallback,如果是 cluster 模式,需要所有
broker 接收到才会调用 /confirm/iCallback。
被 broker 接收到只能表示 message 已经到达服务器,并不能保证消息一定会被投递 到目标 queue 里。所以需要用到接下来的 returnCallback 。 -
spring.rabbitmq.publisher-returns=true
-
spring.rabbitmq.template.mandatory=true
- confrim 模式只能保证消息到达 broker,不能保证消息准确投递到目标 queue 里。在有 些业务场景下,我们需要保证消息一定要投递到目标 queue 里,此时就需要用到
- return 退回模式。
这样如果未能投递到目标 queue 里将调用 returnCallback ,可以记录下详细到投递数 据,定期的巡检或者自动纠错都需要这些数据。 - 消费者获取到消息,成功处理,可以回复Ack给Broker
basic.ack用于肯定确认;broker将移除此消息
basic.nack用于否定确认;可以指定broker是否丢弃此消息,可以批量
basic.reject用于否定确认;同上,但不能批量
默认自动ack,消息被消费者收到,就会从broker的queue中移除
queue无消费者,消息依然会被存储,直到消费者消费
消费者收到消息,默认会自动ack。但是如果无法确定此消息是否被处理完成, 或者成功处理。我们可以开启手动ack模式
消息处理成功,ack(),接受下一个消息,此消息broker就会移除
消息处理失败,nack()/reject(),重新发送给其他人进行处理,或者容错处理后ack
消息一直没有调用ack/nack方法,broker认为此消息正在被处理,不会投递给别人,此时客户 端断开,消息不会被broker移除,会投递给别人、
package com.xunqi.gulimall.order.config; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; @Configuration public class MyRabbitConfig { private RabbitTemplate rabbitTemplate; @Primary @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); this.rabbitTemplate = rabbitTemplate; rabbitTemplate.setMessageConverter(messageConverter()); initRabbitTemplate(); return rabbitTemplate; } @Bean public MessageConverter messageConverter() { return new Jackson2JsonMessageConverter(); } // @PostConstruct //MyRabbitConfig对象创建完成以后,执行这个方法 public void initRabbitTemplate() { //设置确认回调 rabbitTemplate.setConfirmCallback((correlationData,ack,cause) -> { System.out.println("/confirm/i...correlationData["+correlationData+"]==>ack:["+ack+"]==>cause:["+cause+"]"); }); rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) -> { System.out.println("Fail Message["+message+"]==>replyCode["+replyCode+"]" + "==>replyText["+replyText+"]==>exchange["+exchange+"]==>routingKey["+routingKey+"]"); }); } }