网站建设实施计划,用自己网站做邮箱域名解析,如何重新安装wordpress,wordpress充值提现❤ 作者主页#xff1a;李奕赫揍小邰的博客 ❀ 个人介绍#xff1a;大家好#xff0c;我是李奕赫#xff01;(#xffe3;▽#xffe3;)~* #x1f34a; 记得点赞、收藏、评论⭐️⭐️⭐️ #x1f4e3; 认真学习!!!#x1f389;#x1f389; 文章目录 RabbitMQ特性… ❤ 作者主页李奕赫揍小邰的博客 ❀ 个人介绍大家好我是李奕赫(▽)~* 记得点赞、收藏、评论⭐️⭐️⭐️ 认真学习!!! 文章目录 RabbitMQ特性案例springbootrabbitmq RabbitMQ特性
AMQP(高级消息队列协议) 是一个异步消息传递所使用的应用层协议规范作为线路层协议而不是API例如JMSAMQP 客户端能够无视消息的来源任意发送和接受信息。AMQP的原始用途只是为金融界提供一个可以彼此协作的消息协议而现在的目标则是为通用消息队列架构提供通用构建工具。因此面向消息的中间件 MOM系统例如发布/订阅队列没有作为基本元素实现。反而通过发送简化的AMQ实体用户被赋予了构建例如这些实体的能力。这些实体也是规范的一 部分形成了在线路层协议顶端的一个层级AMQP模型。这个模型统一了消息模式诸如之前提到的发布/订阅队列事务以及流数据并且添加了额外的特性例如更易于扩展基于内容的路由。
AMQP当中有四个概念非常重要
virtual host虚拟主机exchange交换机queue队列binding绑定
一个虚拟主机持有一组交换机、队列和绑定。
为什么需要多个虚拟主机呢因为RabbitMQ当中用户只能在虚拟主机的粒度进行权限控制。因此如果需要禁止A组访问B组的交换机/队列/绑定必须为A和B分别创建一个虚拟主机。每一个RabbitMQ服务器都有一个默认的虚拟主机/。
何谓虚拟主机virtual host交换机exchange队列queue和绑定binding
队列Queues是你的消息messages的终点可以理解成装消息的容器。消息就一直在里面直到有客户端也就是消费者Consumer连接到这个队列并且将其取走为止。不过也可以将一个队列配置成这样的一旦消息进入这个队列此消息就被删除。
队列是由消费者Consumer通过程序建立的不是通过配置文件或者命令行工具。这没什么问题如果一个消费者试图创建一个已经存在的队列RabbitMQ会直接忽略这个请求。因此我们可以将消息队列的配置写在应用程序的代码里面。
而要把一个消息放进队列前需要有一个交换机Exchange。
交换机Exchange可以理解成具有路由表的路由程序。每个消息都有一个称为路由键routing key的属性就是一个简单的字符串。交换机当中有一系列的绑定binding即路由规则routes。例如指明具有路由键 “X” 的消息要到名为timbuku的队列当中去。
消费者程序Consumer要负责创建你的交换机。交换机可以存在多个每个交换机在自己独立的进程当中执行因此增加多个交换机就是增加多个进程可以充分利用服务器上的CPU核以便达到更高的效率。例如在一个8核的服务器上可以创建5个交换机来用5个核另外3个核留下来做消息处理。类似的在RabbitMQ的集群当中你可以用类似的思路来扩展交换机一边获取更高的吞吐量。
交换机如何判断要把消息送到哪个队列你需要路由规则即绑定binding。一个绑定就是一个类似这样的规则将交换机“desert沙漠”当中具有路由键“阿里巴巴”的消息送到队列“hideout山洞”里面去。换句话说一个绑定就是一个基于路由键将交换机和队列连接起来的路由规则。例如具有路由键“audit”的消息需要被送到两个队列“log-forever”和“alert-the-big-dude”。要做到这个就需要创建两个绑定每个都连接一个交换机和一个队列两者都是由“audit”路由键触发。在这种情况下交换机会复制一份消息并且把它们分别发送到两个队列当中。交换机不过就是一个由绑定构成的路由表。
交换机有多种类型。他们都是做路由的但是它们接受不同类型的绑定。为什么不创建一种交换机来处理所有类型的路由规则呢因为每种规则用来做匹配分子的CPU开销是不同的。例如一个“topic”类型的交换机试图将消息的路由键与类似“dogs.*”的模式进行匹配。匹配这种末端的通配符比直接将路由键与“dogs”比较“direct”类型的交换机要消耗更多的CPU。如果你不需要“topic”类型的交换机带来的灵活性你可以通过使用“direct”类型的交换机获取更高的处理效率。那么有哪些类型他们又是怎么处理的呢 Exchange Direct Exchange Fanout Exchange Topic 持久化
你花了大量的时间来创建队列、交换机和绑定然后服务器程序挂了。你的队列、交换机和绑定怎么样了还有放在队列里面但是尚未处理的消息们呢
如果你是用默认参数构造的这一切的话那么他们都灰飞烟灭了。RabbitMQ重启之后会干净的像个新生儿。你必须重做所有的一切亡羊补牢如何避免将来再度发生此类杯具
队列和交换机有一个创建时候指定的标志durable。durable的唯一含义就是具有这个标志的队列和交换机会在重启之后重新建立它不表示说在队列当中的消息会在重启后恢复。那么如何才能做到不只是队列和交换机还有消息都是持久的呢
但是首先需要考虑的问题是是否真的需要消息的持久化如果需要重启后消息可以回复那么它需要被写入磁盘。但即使是最简单的磁盘操作也是要消耗时间的。所以需要衡量判断。
当你将消息发布到交换机的时候可以指定一个标志“Delivery Mode”投递模式。根据你使用的AMQP的库不同指定这个标志的方法可能不太一样。简单的说就是将Delivery Mode设置成2也就是持久的persistent即可。一般的AMQP库都是将Delivery Mode设置成1也就是非持久的。所以要持久化消息的步骤如下
将交换机设成 durable。将队列设成 durable。将消息的 Delivery Mode 设置成2 。
绑定Bindings怎么办绑定无法在创建的时候设置成durable。没问题如果你绑定了一个durable的队列和一个durable的交换机RabbitMQ会自动保留这个绑定。类似的如果删除了某个队列或交换机无论是不是durable依赖它的绑定都会自动删除。
注意
RabbitMQ 不允许你绑定一个非坚固non-durable的交换机和一个durable的队列。反之亦然。要想成功必须队列和交换机都是durable的。一旦创建了队列和交换机就不能修改其标志了。例如如果创建了一个non-durable的队列然后想把它改变成durable的唯一的办法就是删除这个队列然后重现创建。因此最好仔细检查创建的标志。
案例
springbootrabbitmq
pom.xml
dependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactIdscopetest/scope/dependencydependencygroupIdorg.springframework.amqp/groupIdartifactIdspring-rabbit-test/artifactIdscopetest/scope/dependency
/dependenciesapplication.properties
server.port8181#rabbitmq配置
spring.rabbitmq.host127.0.0.1
spring.rabbitmq.port5672
spring.rabbitmq.usernamewt
spring.rabbitmq.password123456
spring.rabbitmq.virtual-host/实体类
package cn.edu.entity;import java.io.Serializable;public class User implements Serializable {/*** */private static final long serialVersionUID 1L;private int id;private String userName;private String userPassword;public int getId() {return id;}public void setId(int id) {this.id id;}public String getUserName() {return userName;}public void setUserName(String userName) {this.userName userName;}public String getUserPassword() {return userPassword;}public void setUserPassword(String userPassword) {this.userPassword userPassword;}Overridepublic String toString() {return User [id id , userName userName , userPassword userPassword ];}}rabbitmq config
package cn.edu.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 实例化* 交换机对象* 队列对象* 绑定对象* author Administrator**/
Configuration
public class RabbitMqConfig {Autowiredprivate RabbitTemplate rabbitTemplate;Bean(namerabbitAdmin)public RabbitAdmin getRabbitAdmin() {RabbitAdmin rabbitAdminnew RabbitAdmin(this.rabbitTemplate.getConnectionFactory());rabbitAdmin.setAutoStartup(true);return rabbitAdmin;}Bean(namequeue1)public Queue queue1(Qualifier(rabbitAdmin) RabbitAdmin rabbitAdmin) {Queue queuenew Queue(queue1,true);return queue;}Bean(namequeue2)public Queue queue2() {Queue queuenew Queue(queue2,true);return queue;}Beanpublic DirectExchange exchange() {DirectExchange directExchangenew DirectExchange(directExchangeName);return directExchange;}Beanpublic Binding bind01(Qualifier(queue1) Queue messageQueue,Qualifier(exchange) DirectExchange directExchange) {Binding bindingBindingBuilder.bind(messageQueue).to(directExchange).with(item1);return binding;}Beanpublic Binding bind02(Qualifier(queue2) Queue objectQueue,Qualifier(exchange) DirectExchange directExchange) {Binding bindingBindingBuilder.bind(objectQueue).to(directExchange).with(item2);return binding;}
}生产者类
package cn.edu.controller;import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;import cn.tedu.entity.User;RestController
public class UserController {//ReabbitTemplate实现自AmqpTemplateAutowiredprivate RabbitTemplate rabbitTempate;Autowiredprivate AmqpTemplate amqpTempate;/*** 演示给队列发送字符串* return*/GetMapping(/sendMessage)public String sendMessage() {//把消息(aaaaaaaaaaaaa)发送给直接交换机(directExchangeName),使用的是路由key(item1),路由key关联了一个队列this.rabbitTempate.convertAndSend(directExchangeName,item1,aaaaaaaaaaaaa);//没有指定交换机,使用的就是默认交换机//this.rabbitTempate.convertAndSend(item1,aaaaaaaaaaaaa);return success;}/*** 演示给队列发送对象* return*/GetMapping(/sendObject)public String sendObject() {User usernew User();user.setId(1);user.setUserName(张三);user.setUserPassword(123);//把消息(user对象)发送给直接交换机(directExchangeName),使用的是路由key(item2),路由key关联了一个队列this.rabbitTempate.convertAndSend(directExchangeName,item2,user);return success;}
}消费者
package cn.edu.mqconsumer;import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import cn.tedu.entity.User;/*** 用来消费队列中的数据* author Administrator**/
Component
public class ConsumerObject {/*** 用来监听消费queue1中的数据* param msg*/RabbitListener(queues {queue1})RabbitHandlerpublic void process01(Message msg) {System.out.print(msg.getClass()消费者消费队列1中的字符串消息);System.out.println(new String(msg.getBody()));}/*** 用来监听消费queue2中的数据* param msg*/RabbitListener(queues {queue2})RabbitHandlerpublic void process02(Message msg) {try {System.out.print(msg.getClass()消费者消费队列2中的User对象的消息);byte[] bytesmsg.getBody();//把字节数据转换成具体的User对象ByteArrayInputStream baisnew ByteArrayInputStream(bytes);//把字节输入流转换成对象输入流ObjectInputStream oisnew ObjectInputStream(bais);User user(User)ois.readObject();System.out.println(user.getId() user.getUserName() user.getUserPassword());} catch (Exception e) {// TODO Auto-generated catch blocke.printStackTrace();}}
}主启动
package cn.edu;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;SpringBootApplication
public class TestspringbootRabbitmqApplication {public static void main(String[] args) {SpringApplication.run(TestspringbootRabbitmqApplication.class, args);}}