跟学视频:https://www.bilibili.com/video/av18997807/?p=1 如果视频失效可以在B站搜索 “rabbitMq 老王” 相关demo代码地址:https://github.com/zhongzhongBaby/MQDemo
1.课程概述 1.1入门问题 1问 :JMS、消息中间件(MOM) 、MQ 三者的关系? 消息中间件(MOM) 是软件发展过程中的一种技术解决方案。 JMS是一个用于提供消息服务的技术规范,它制定了在整个消息服务提供过程中的所有数据结构和交互流程 简单点说就是消息中间件的模板 MQ是消息中间件的一种实现,MQ的实现可以遵循JMS规范也可以基于其他规范 AMQP 是一种协议 解决跨语言和跨平台的一种协议。 而JMS 定义了JAVA API层面的标准,适用于java 体系,ActiveMq 是JMS的一种实现,RaabbitMQ 也是AMQP协议的一种实现。 JMS与AMQP简述以及比较 参考https://blog.csdn.net/belonghuang157405/article/details/83184388 JMS 支持两种消息传递模型,那意味着所有基于JMS规范的MQ 也得有这两种消息模型啊,总之基于什么规范实现,就要实现对应的消息模型。2问 :消息队列解决问题? 流量削峰: 抢红包,秒杀,抢火车票等等 日志收集:(大数据时代)(图片看不懂就跳过) 异步、解耦:(图片看不懂就跳过)
2.安装 2.1安装RaabbitMQ 遇到的坑 Windows安装: 1.RaabbitMQ和Erlang版本不对应问题: 版本要求严格,完全不兼容。版本对应要求参考网址https://www.RaabbitMQ.com/which-erlang.html 2.webmanage 插件安装 rabbitmq-plugins enable rabbitmq_management 3.安装插件之后不能打开web管理页面 删除路径C:\Users\PXX05\AppData\Roaming\RabbitMQ\db下的所有文件并重新点击安装 4.由于版本不对应,删除了RaabbitMQ服务之后无法重新安装RaabbitMQ,或者启动失败? 利用regedit命令进入注册表编辑器。 在此路径HKEY_LOCAL_MACHINE\SOFTWARE\Ericsson\下,将Erlang全部清除。 重新安装
Linux安装: 暂无安装。
3.简单队列 模型 :
特点 :一个生产者一个消费者,一条消息只能被消费一次。 简单队列不足: 1.只有一个消费者。 2.队列名要变生产者和消费者的代码都得变。
4.工作队列 模型 :
特点 :一个生产者,两个消费者,一条消息也是只能被消费一次。1问 :如何分发消息给不同的消费者呢? 方式一(轮询分发): 现象不管两个消费者忙与闲,消息都被平均分开去处理,这个方式叫做轮询分发。 方式二(公平分发): 也称作是能者多劳模式。意思就是谁处理能力高谁就处理的消息多一点。需要注意的是,在能者多劳模式下,需要消费者手动回复确认信号给消息队列服务器 生产者和消费者都要设置qos(1),保证一次分发一个,一次预处理一个消息。
5.消息应答和持久化 5.1消息应答: 1.自动确认模式 :消息队列将消息分发给消费者就会删除在队列中的消息,如果消费者在处理这条消息的时候挂掉了 那么这条消息也就丢失了2.手动确认模式 :消费者处理完之后手动发送确认信息,告诉RaabbitMQ消息可以删除了,如果一直收不到确认消息,那就分发给别的消费者。
5.2持久化 防止RaabbitMQ 服务挂掉之后,消息丢失
5.2.1 队列持久化 1 channel.queueDeclare(QUEUE_NAM ,true(是否持久化),false,false,null);
5.2.2 消息持久化 1 channel.basicPublish("",QUEUE_NAMW, MessageProperties.MINIMAL_PERSISTENT_BASIC(消息持久化),msg.getBytes());
1问 :消息持久化存在何处?
6.发布订阅模式 也可以称为是广播模式模型 :特点 : 一个消息可以被两个消费者消费,相比于工作队列区别在于一条消息的被处理次数。 过程:消息生产者将消息发给交换机,两个队列都绑定到同一个交换机上,就可以实现两个消费者消费同一条消息1问 :因此是否可以把发布订阅模式看成是工作队列和简单队列的升级?补充 : 交换机没有存储的能力,RaabbitMQ中只有队列有存储消息的能力,因此如果交换机没有队列绑定,那么发送到交换机的消息就会被丢失。 场景:对于某个用户的业务操作需要短信和邮件两个业务通知,使用消息队列的话就可以实现短信和邮件业务以及前期业务操作的解耦。发布订阅模式可以更好的理解消息中间件的解耦优点。2问 :是否可以在发布订阅模式下,实现工作队列? 可以!并且可以根据自己的需求设定(轮询分发/公平分发) 交换机类型:fanout Exchange (不处理路由键)
7. 路由模式 模型 :交换机对比 : Fanout exchange(不处理路由键) Direct (处理路由键)
Key和队列的关系 :多对多的关系,同一个key可以被多个队列绑定,一个队列也可以绑定多个key。 过程: 绑定队列到交换器,绑定路由键到队列,交换机根据路由键去转发消息。
8.主题模式 topic 模式(也可以称作是通配符模式)模型 :
路由键和某模式匹配
#号匹配一个或者多个词语 *匹配一个词语 主题模式其实就是在路由键上做了点文章,路由键可以使用通配符来匹配,路由键通过. .***来将消息层次分类
9.rabbiMq 的消息确认机制 我们可以通过之前讲的消息持久化机制,防止由于消息队列服务器崩溃造成的消息丢失,但是还有一种情况,比如生产者将消息发出去了,但实际上并没有发到服务器,这样也会造成消息丢失。
9.1 事务的模式 通过事务的方式确认生产者的消息有么有到达Rabbit服务器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public static void main(String[] args) throws Exception { Connection connection = ConnectionUtils.getConection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAMW, false, false, false, null); String msg = "hello tx"; try { channel.txSelect(); //int i =1/0; channel.basicPublish("", QUEUE_NAMW, null, msg.getBytes()); channel.txCommit(); } catch (Exception e) { channel.txRollback(); System.out.println("消息没有发出去"); } System.out.println("finished"); channel.close(); connection.close(); }
事务模式缺点:提交又回滚,降低吞吐量。浪费资源。
9.2 Confirm模式 单条确认 : channal.basixpublish() 之后channel.waitForConfirms()
1 2 3 4 5 6 channel.confirmSelect(); channel.basicPublish("", QUEUE_NAMW, null, msg.getBytes()); if (!channel.waitForConfirms()){ //监听 System.out.println("msg send failed"); //回调方法 }else{ System.out.println("msg send success");
批量确认 :批量确认模式就是多次channal.basixpublish()之后再channel.waitForConfirms() 原理:使用SortedSet存放未确认的消息序列号,收到确认之后再去删除集合中对用的序列号如果收到的是nack回复,则对消息进行重新发送,如果失败次数达到上线就不发了。 主要步骤:通过channal.getnexpublishSeqNo 得到消息的序列号 Chanal.addconfirmListener
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 channel.confirmSelect(); //未确认消息集合 final SortedSet<Long> unConfirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>()); //添加监听器 channel.addConfirmListener(new ConfirmListener() { @Override public void handleAck(long l, boolean b) throws IOException { if (b) { System.out.println("handleAck" + "批量"); } else { System.out.println("handleAck" + "单个"); } } @Override public void handleNack(long l, boolean b) throws IOException { if (b) { System.out.println("handleNack" + "批量"); } else { System.out.println("handleNack" + "单个"); } } }); int i = 100; while (i > 0) { Long seqNo = channel.getNextPublishSeqNo(); channel.basicPublish("", QUEUE_NAMW, null, (msg + i).getBytes()); unConfirmSet.add(seqNo); i--; }
10. springboot 集成整合RaabbitMQ 10.1依赖: 1 2 3 4 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
10.2 配置 1 2 3 4 5 6 7 spring: rabbitmq: host: 127.0.0.1 port: 5672 virtual-host: /vhost_gyz username: gengyuzhong password: 123456
10.3简单队列: 发送 :
1 2 3 4 5 6 7 8 @Autowired private AmqpTemplate amqpTemplate; public String send1(){ String context = "简单队列的消息"; amqpTemplate.convertAndSend("message", context); return "发送成功"; }
监听 :
1 2 3 4 5 6 7 8 9 @Component public class HelloService { @RabbitListener(queuesToDeclare = @Queue("message")) @RabbitHandler public void process(String str) { System.out.println("处理信息中......"+str); System.out.println("处理成功,发送短信通知用户"); }
10.4 工作队列 (轮询分发模式) 只需要配置多个监听到同一个队列就行了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @RabbitListener(queuesToDeclare = @Queue("message2")) @RabbitHandler public void process2_1(String str) { System.out.println("处理信息中......"+str); System.out.println("处理成功,通过消费者1发送短信通知用户"); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } } @RabbitListener(queuesToDeclare = @Queue("message2")) @RabbitHandler public void process2_2(String str) { System.out.println("处理信息中......"+str); System.out.println("处理成功,通过消费者2发送短信通知用户"); }
公平分发(能者多劳模式):只需要添加配置如下 增加配置 listener.simple,.prefetch
1 2 3 4 5 6 7 8 9 10 spring: rabbitmq: host: 127.0.0.1 port: 5672 virtual-host: /vhost_gyz username: gengyuzhong password: 123456 listener: simple: prefetch: 1
10.5 发布订阅模式 配置fanout 交换器,定义队列,定义交换器,绑定队列到交换器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitFanoutExchangeConfiguration { /** * 声明队列 */ @Bean(name = "message_3_1") public Queue message_3_1() { return new Queue("message_3_1"); } @Bean("message_3_2") public Queue message_3_2() { return new Queue("message_3_2"); } /** * 声明fanout交换器 */ @Bean public FanoutExchange exchange_message3() { return new FanoutExchange("exchange_message3"); } /** * 绑定队列到交换器 */ @Bean public Binding binding3_1() { return BindingBuilder.bind(message_3_1()).to(exchange_message3()); } @Bean public Binding binding3_2() { return BindingBuilder.bind(message_3_2()).to(exchange_message3()); } }
监听的代码都和《简单队列》的代码一致,发送的代码区别如下:
1 amqpTemplate.convertAndSend("exchange_message3","", context);
10.6 路由模式 配置如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 @Configuration public class RabbitDirectExchangeConfiguration { /** * 声明队列 */ @Bean(name = "message_4_1") public Queue message_4_1() { return new Queue("message_4_1"); } @Bean("message_4_2") public Queue message_4_2() { return new Queue("message_4_2"); } /** * 声明Direct交换器 */ @Bean public DirectExchange exchange_message4() { return new DirectExchange("exchange_message4"); } /** * 绑定队列到交换器 */ @Bean public Binding binding4_1() { return BindingBuilder.bind(message_4_1()).to(exchange_message4()).with("error"); } @Bean public Binding binding4_3() { return BindingBuilder.bind(message_4_1()).to(exchange_message4()).with("info"); } @Bean public Binding binding4_2() { return BindingBuilder.bind(message_4_2()).to(exchange_message4()).with("error"); } }
10.6 topic 模式 配置如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 @Configuration public class RabbitTopicExchangeConfiguration { /** * 声明队列 */ @Bean(name = "message_5_1") public Queue message_5_1() { return new Queue("message_5_1"); } @Bean("message_5_2") public Queue message_5_2() { return new Queue("message_5_2"); } /** * 声明Direct交换器 */ @Bean public DirectExchange exchange_message5() { return new DirectExchange("exchange_message5"); } /** * 绑定队列到交换器 */ @Bean public Binding binding5_1() { return BindingBuilder.bind(message_5_1()).to(exchange_message5()).with("goods.add"); } @Bean public Binding binding5_3() { return BindingBuilder.bind(message_5_1()).to(exchange_message5()).with("goods.delete"); } @Bean public Binding binding5_2() { return BindingBuilder.bind(message_5_2()).to(exchange_message5()).with("goods.#"); } }
11遗留问题 1问:是否可以通过“延迟队列”处理30分钟未支付订单 ?原因 2问:是否可以通过MQ解决重复请求的问题?原因
Original author: Geng
Original link: https://zhongzhongbaby.github.io/2019/10/23/RabiitMQ学习笔记/
Copyright Notice: Please indicate the source of the reprint (must retain the author's signature and link)