当前位置: 首页 > news >正文

国外的设计网站app吗那里可以做工作室做网站

国外的设计网站app吗,那里可以做工作室做网站,深圳最好的品牌设计公司,星夜智能建站平台在介绍RabbitMQ之前#xff0c;我们先来看下面一个电商项目的场景#xff1a; 商品的原始数据保存在数据库中#xff0c;增删改查都在数据库中完成。 搜索服务数据来源是索引库#xff08;Elasticsearch#xff09;#xff0c;如果数据库商品发生变化#xff0c;索引库…在介绍RabbitMQ之前我们先来看下面一个电商项目的场景 商品的原始数据保存在数据库中增删改查都在数据库中完成。 搜索服务数据来源是索引库Elasticsearch如果数据库商品发生变化索引库数据不能及时更新。 商品详情做了页面静态化处理静态页面数据也不会随着数据库商品更新而变化。 如果我们在后台修改了商品的价格搜索页面和商品详情页显示的依然是旧的价格这样显然不对。该如何解决   我们可能会想到这么做 方案1每当后台对商品做增删改操作同时修改索引库数据及更新静态页面。 方案2搜索服务和商品页面静态化服务对外提供操作接口后台在商品增删改后调用接口。  这两种方案都有个严重的问题就是代码耦合后台服务中需要嵌入搜索和商品页面服务违背了微服务的独立原则。 这时我们就会采用另外一种解决办法那就是消息队列  商品服务对商品增删改以后无需去操作索引库和静态页面只需向MQ发送一条消息比如包含商品id的消息也不关心消息被谁接收。 搜索服务和静态页面服务监听MQ接收消息然后分别去处理索引库和静态页面根据商品id去更新索引库和商品详情静态页面。  什么是消息队列 MQ全称为Message Queue即消息队列。“消息队列”是在消息的传输过程中保存消息的容器。它是典型的生产者、消费者模型。生产者不断向消息队列中生产消息消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的而且只关心消息的发送和接收没有业务逻辑的侵入这样就实现了生产者和消费者的解耦。 开发中消息队列通常有如下应用场景 1、任务异步处理 高并发环境下由于来不及同步处理请求往往会发生堵塞比如说大量的insertupdate之类的请求同时到达MySQL直接导致无数的行锁表锁甚至最后请求会堆积过多从而触发too many connections错误。通过使用消息队列我们可以异步处理请求从而缓解系统的压力。将不需要同步处理的并且耗时长的操作由消息队列通知消息接收方进行异步处理。减少了应用程序的响应时间。 2、应用程序解耦合 MQ相当于一个中介生产方通过MQ与消费方交互它将应用程序进行解耦合。 AMQP和JMS MQ是消息通信的模型并发具体实现。现在实现MQ的有两种主流方式AMQP、JMS。 两者间的区别和联系 JMS是定义了统一的接口来对消息操作进行统一AMQP是通过规定协议来统一数据交互的格式 JMS限定了必须使用Java语言AMQP只是协议不规定实现方式因此是跨语言的。 JMS规定了两种消息模型而AMQP的消息模型更加丰富 常见MQ产品 ActiveMQ基于JMS RabbitMQ基于AMQP协议erlang语言开发稳定性好 RocketMQ基于JMS阿里巴巴产品目前交由Apache基金会 Kafka分布式消息系统高吞吐量 RabbitMQ快速入门 RabbitMQ是由erlang语言开发基于AMQPAdvanced Message Queue 高级消息队列协议协议实现的消息队列它是一种应用程序之间的通信方法消息队列在分布式系统开发中应用非常广泛。RabbitMQ官方地址http://www.rabbitmq.com 下载与安装 RabbitMQ由Erlang语言开发需要安装与RabbitMQ版本对应的Erlang语言环境具体的就不解释了自行搜索教程。RabbitMQ官网下载地址http://www.rabbitmq.com/download.html RabbitMQ的工作原理 下图是RabbitMQ的基本结构 组成部分说明 Broker消息队列服务进程此进程包括两个部分Exchange和QueueExchange消息队列交换机按一定的规则将消息路由转发到某个队列对消息进行过虑。Queue消息队列存储消息的队列消息到达队列并转发给指定的Producer消息生产者即生产方客户端生产方客户端将消息发送Consumer消息消费者即消费方客户端接收MQ转发的消息。 生产者发送消息流程 1、生产者和Broker建立TCP连接。 2、生产者和Broker建立通道。 3、生产者通过通道消息发送给Broker由Exchange将消息进行转发。 4、Exchange将消息转发到指定的Queue队列 消费者接收消息流程 1、消费者和Broker建立TCP连接 2、消费者和Broker建立通道 3、消费者监听指定的Queue队列 4、当有消息到达Queue时Broker默认将消息推送给消费者。 5、消费者接收到消息。 6、ack回复 六种消息模型 ①基本消息模型 在上图的模型中有以下概念 P生产者也就是要发送消息的程序 C消费者消息的接受者会一直等待消息到来。 queue消息队列图中红色部分。可以缓存消息生产者向其中投递消息消费者从其中取出消息。  生产者 新建一个maven工程添加amqp-client依赖 dependencygroupIdcom.rabbitmq/groupIdartifactIdamqp-client/artifactIdversion5.7.1/version /dependency 连接工具类 public class ConnectionUtil {/*** 建立与RabbitMQ的连接* return* throws Exception*/public static Connection getConnection() throws Exception {//定义连接工厂ConnectionFactory factory new ConnectionFactory();//设置服务地址factory.setHost(192.168.1.103);//端口factory.setPort(5672);//设置账号信息用户名、密码、vhostfactory.setVirtualHost(/kavito);//设置虚拟机一个mq服务可以设置多个虚拟机每个虚拟机就相当于一个独立的mqfactory.setUsername(kavito);factory.setPassword(123456);// 通过工厂获取连接Connection connection factory.newConnection();return connection;} } 生产者发送消息 public class Send {private final static String QUEUE_NAME simple_queue;public static void main(String[] argv) throws Exception {// 1、获取到连接Connection connection ConnectionUtil.getConnection();// 2、从连接中创建通道使用通道才能完成消息相关的操作Channel channel connection.createChannel();// 3、声明创建队列//参数String queue, boolean durable, boolean exclusive, boolean autoDelete, MapString, Object arguments/*** 参数明细* 1、queue 队列名称* 2、durable 是否持久化如果持久化mq重启后队列还在* 3、exclusive 是否独占连接队列只允许在该连接中访问如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建* 4、autoDelete 自动删除队列不再使用时是否自动删除此队列如果将此参数和exclusive参数设置为true就可以实现临时队列队列不用了就自动删除* 5、arguments 参数可以设置一个队列的扩展参数比如可设置存活时间*/channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 4、消息内容String message Hello World!;// 向指定的队列中发送消息//参数String exchange, String routingKey, BasicProperties props, byte[] body/*** 参数明细* 1、exchange交换机如果不指定将使用mq的默认交换机设置为* 2、routingKey路由key交换机根据路由key来将消息转发到指定的队列如果使用默认交换机routingKey设置为队列的名称* 3、props消息的属性* 4、body消息内容*/channel.basicPublish(, QUEUE_NAME, null, message.getBytes());System.out.println( [x] Sent message );//关闭通道和连接(资源关闭最好用try-catch-finally语句处理)channel.close();connection.close();} } 控制台 web管理页面服务器地址/端口号 本地127.0.0.1:15672默认用户及密码guest guest 点击队列名称进入详情页可以查看消息 消费者接收消息 public class Recv {private final static String QUEUE_NAME simple_queue;public static void main(String[] argv) throws Exception {// 获取到连接Connection connection ConnectionUtil.getConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成Channel channel connection.createChannel();// 声明队列//参数String queue, boolean durable, boolean exclusive, boolean autoDelete, MapString, Object arguments/*** 参数明细* 1、queue 队列名称* 2、durable 是否持久化如果持久化mq重启后队列还在* 3、exclusive 是否独占连接队列只允许在该连接中访问如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建* 4、autoDelete 自动删除队列不再使用时是否自动删除此队列如果将此参数和exclusive参数设置为true就可以实现临时队列队列不用了就自动删除* 5、arguments 参数可以设置一个队列的扩展参数比如可设置存活时间*/channel.queueDeclare(QUEUE_NAME, false, false, false, null);//实现消费方法DefaultConsumer consumer new DefaultConsumer(channel){// 获取消息并且处理这个方法类似事件监听如果有消息的时候会被自动调用/*** 当接收到消息后此方法将被调用* param consumerTag 消费者标签用来标识消费者的在监听队列时设置channel.basicConsume* param envelope 信封通过envelope* param properties 消息属性* param body 消息内容* throws IOException*/Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//交换机String exchange envelope.getExchange();//消息idmq在channel中用来标识消息的id可用于确认消息已接收long deliveryTag envelope.getDeliveryTag();// body 即消息体String msg new String(body,utf-8);System.out.println( [x] received : msg !);}};// 监听队列第二个参数是否自动进行消息确认。//参数String queue, boolean autoAck, Consumer callback/*** 参数明细* 1、queue 队列名称* 2、autoAck 自动回复当消费者接收到消息后要告诉mq消息已接收如果将此参数设置为tru表示会自动回复mq如果设置为false要通过编程实现回复* 3、callback消费方法当消费者接收到消息要执行的方法*/channel.basicConsume(QUEUE_NAME, true, consumer);} } 控制台打印 再看看队列的消息,已经被消费了 我们发现消费者已经获取了消息但是程序没有停止一直在监听队列中是否有新的消息。一旦有新的消息进入队列就会立即打印. 消息确认机制ACK 通过刚才的案例可以看出消息一旦被消费者接收队列中的消息就会被删除。 那么问题来了RabbitMQ怎么知道消息被接收了呢 如果消费者领取消息后还没执行操作就挂掉了呢或者抛出了异常消息消费失败但是RabbitMQ无从得知这样消息就丢失了 因此RabbitMQ有一个ACK机制。当消费者获取消息后会向RabbitMQ发送回执ACK告知消息已经被接收。不过这种回执ACK分两种情况 自动ACK消息一旦被接收消费者自动发送ACK 手动ACK消息接收后不会发送ACK需要手动调用 大家觉得哪种更好呢 这需要看消息的重要性 如果消息不太重要丢失也没有影响那么自动ACK会比较方便 如果消息非常重要不容丢失。那么最好在消费完成后手动ACK否则接收消息后就自动ACKRabbitMQ就会把消息从队列中删除。如果此时消费者宕机那么消息就丢失了。 我们之前的测试都是自动ACK的如果要手动ACK需要改动我们的代码 public class Recv2 {private final static String QUEUE_NAME simple_queue;public static void main(String[] argv) throws Exception {// 获取到连接Connection connection ConnectionUtil.getConnection();// 创建通道final Channel channel connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 定义队列的消费者DefaultConsumer consumer new DefaultConsumer(channel) {// 获取消息并且处理这个方法类似事件监听如果有消息的时候会被自动调用Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// body 即消息体String msg new String(body);System.out.println( [x] received : msg !);// 手动进行ACK/** void basicAck(long deliveryTag, boolean multiple) throws IOException;* deliveryTag:用来标识消息的id* multiple是否批量.true:将一次性ack所有小于deliveryTag的消息。*/channel.basicAck(envelope.getDeliveryTag(), false);}};// 监听队列第二个参数false手动进行ACKchannel.basicConsume(QUEUE_NAME, false, consumer);} }最后一行代码设置第二个参数为false channel.basicConsume(QUEUE_NAME, false, consumer); 自动ACK存在的问题 修改消费者添加异常如下 生产者不做任何修改直接运行消息发送成功   运行消费者程序抛出异常 管理界面 消费者抛出异常但是消息依然被消费实际上我们还没获取到消息。 演示手动ACK 重新运行生产者发送消息 同样在手动进行ack前抛出异常运行Recv2 再看看管理界面 消息没有被消费掉 还有另外一种情况修改消费者Recv2把监听队列第二个参数自动改成手动,去掉之前制造的异常 并且消费方法中没手动进行ACK 生产者代码不变再次运行 运行消费者 但是查看管理界面发现 停掉消费者的程序发现   这是因为虽然我们设置了手动ACK但是代码中并没有进行消息确认所以消息并未被真正消费掉。当我们关掉这个消费者消息的状态再次变为Ready。 正确的做法是 我们要在监听队列时设置第二个参数为false,代码中手动进行ACK 再次运行消费者查看web管理页面 消费者消费成功   生产者避免数据丢失https://www.cnblogs.com/vipstone/p/9350075.html ②work消息模型 工作队列或者竞争消费者模式 work queues与入门程序相比多了一个消费端两个消费端共同消费同一个队列中的消息但是一个消息只能被一个消费者获取。 这个消息模型在Web应用程序中特别有用可以处理短的HTTP请求窗口中无法处理复杂的任务。 接下来我们来模拟这个流程 P生产者任务的发布者 C1消费者1领取任务并且完成任务假设完成速度较慢模拟耗时 C2消费者2领取任务并且完成任务假设完成速度较快 生产者 生产者循环发送50条消息 public class Send {private final static String QUEUE_NAME test_work_queue;public static void main(String[] argv) throws Exception {// 获取到连接Connection connection ConnectionUtil.getConnection();// 获取通道Channel channel connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 循环发布任务for (int i 0; i 50; i) {// 消息内容String message task .. i;channel.basicPublish(, QUEUE_NAME, null, message.getBytes());System.out.println( [x] Sent message );Thread.sleep(i * 2);}// 关闭通道和连接channel.close();connection.close();} } 消费者1 public class Recv {private final static String QUEUE_NAME test_work_queue;public static void main(String[] argv) throws Exception {// 获取到连接Connection connection ConnectionUtil.getConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成Channel channel connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);//实现消费方法DefaultConsumer consumer new DefaultConsumer(channel){// 获取消息并且处理这个方法类似事件监听如果有消息的时候会被自动调用Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// body 即消息体String msg new String(body,utf-8);System.out.println( [消费者1] received : msg !);//模拟任务耗时1stry { TimeUnit.SECONDS.sleep(1); } catch (Exception e) { e.printStackTrace(); }}};// 监听队列第二个参数是否自动进行消息确认。channel.basicConsume(QUEUE_NAME, true, consumer);} } 消费者2 代码不贴了与消费者1基本类似只是消费者2没有设置消费耗时时间。 接下来两个消费者一同启动然后发送50条消息 可以发现两个消费者各自消费了不同25条消息这就实现了任务的分发。    能者多劳 刚才的实现有问题吗 消费者1比消费者2的效率要低一次任务的耗时较长 然而两人最终消费的消息数量是一样的 消费者2大量时间处于空闲状态消费者1一直忙碌 现在的状态属于是把任务平均分配正确的做法应该是消费越快的人消费的越多。 怎么实现呢 通过 BasicQos 方法设置prefetchCount 1。这样RabbitMQ就会使得每个Consumer在同一个时间点最多处理1个Message。换句话说在接收到该Consumer的ack前他它不会将新的Message分发给它。相反它会将其分派给不是仍然忙碌的下一个Consumer。 值得注意的是prefetchCount在手动ack的情况下才生效自动ack不生效。 再次测试  订阅模型分类 说明下 1、一个生产者多个消费者 2、每个消费者都有一个自己的队列 3、生产者没有将消息直接发送给队列而是发送给exchange(交换机、转发器) 4、每个队列都需要绑定到交换机上 5、生产者发送的消息经过交换机到达队列实现一个消息被多个消费者消费 例子注册-发邮件、发短信 XExchanges交换机一方面接收生产者发送的消息。另一方面知道如何处理消息例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作取决于Exchange的类型。 Exchange类型有以下几种 Fanout广播将消息交给所有绑定到交换机的队列 Direct定向把消息交给符合指定routing key 的队列 Topic通配符把消息交给符合routing pattern路由模式 的队列 Headerheader模式与routing不同的地方在于header模式取消routingkey使用header中的 key/value键值对匹配队列。 Header模式不展开了感兴趣可以参考这篇文章https://blog.csdn.net/zhu_tianwei/article/details/40923131 Exchange交换机只负责转发消息不具备存储消息的能力因此如果没有任何队列与Exchange绑定或者没有符合路由规则的队列那么消息会丢失 ③Publish/subscribe交换机类型Fanout也称为广播  Publish/subscribe模型示意图 生产者 和前面两种模式不同 1 声明Exchange不再声明Queue 2 发送消息到Exchange不再发送到Queue public class Send {private final static String EXCHANGE_NAME test_fanout_exchange;public static void main(String[] argv) throws Exception {// 获取到连接Connection connection ConnectionUtil.getConnection();// 获取通道Channel channel connection.createChannel();// 声明exchange指定类型为fanoutchannel.exchangeDeclare(EXCHANGE_NAME, fanout);// 消息内容String message 注册成功;// 发布消息到Exchangechannel.basicPublish(EXCHANGE_NAME, , null, message.getBytes());System.out.println( [生产者] Sent message );channel.close();connection.close();} } 消费者1 注册成功发给短信服务 public class Recv {private final static String QUEUE_NAME fanout_exchange_queue_sms;//短信队列private final static String EXCHANGE_NAME test_fanout_exchange;public static void main(String[] argv) throws Exception {// 获取到连接Connection connection ConnectionUtil.getConnection();// 获取通道Channel channel connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 绑定队列到交换机channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, );// 定义队列的消费者DefaultConsumer consumer new DefaultConsumer(channel) {// 获取消息并且处理这个方法类似事件监听如果有消息的时候会被自动调用Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {// body 即消息体String msg new String(body);System.out.println( [短信服务] received : msg !);}};// 监听队列自动返回完成channel.basicConsume(QUEUE_NAME, true, consumer);} } 消费者2注册成功发给邮件服务 public class Recv2 {private final static String QUEUE_NAME fanout_exchange_queue_email;//邮件队列private final static String EXCHANGE_NAME test_fanout_exchange;public static void main(String[] argv) throws Exception {// 获取到连接Connection connection ConnectionUtil.getConnection();// 获取通道Channel channel connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 绑定队列到交换机channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, );// 定义队列的消费者DefaultConsumer consumer new DefaultConsumer(channel) {// 获取消息并且处理这个方法类似事件监听如果有消息的时候会被自动调用Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {// body 即消息体String msg new String(body);System.out.println( [邮件服务] received : msg !);}};// 监听队列自动返回完成channel.basicConsume(QUEUE_NAME, true, consumer);} } 我们运行两个消费者然后发送1条消息 思考 1、publish/subscribe与work queues有什么区别。 区别 1work queues不用定义交换机而publish/subscribe需要定义交换机。 2publish/subscribe的生产方是面向交换机发送消息work queues的生产方是面向队列发送消息(底层使用默认交换机)。 3publish/subscribe需要设置队列和交换机的绑定work queues不需要设置实际上work queues会将队列绑定到默认的交换机 。 相同点 所以两者实现的发布/订阅的效果是一样的多个消费端监听同一个队列不会重复消费消息。 2、实际工作用 publish/subscribe还是work queues。 建议使用 publish/subscribe发布订阅模式比工作队列模式更强大也可以做到同一队列竞争并且发布订阅模式可以指定自己专用的交换机。 ④Routing 路由模型交换机类型direct Routing模型示意图 P生产者向Exchange发送消息发送消息时会指定一个routing key。 XExchange交换机接收生产者的消息然后把消息递交给 与routing key完全匹配的队列 C1消费者其所在队列指定了需要routing key 为 error 的消息 C2消费者其所在队列指定了需要routing key 为 info、error、warning 的消息 接下来看代码 生产者 public class Send {private final static String EXCHANGE_NAME test_direct_exchange;public static void main(String[] argv) throws Exception {// 获取到连接Connection connection ConnectionUtil.getConnection();// 获取通道Channel channel connection.createChannel();// 声明exchange指定类型为directchannel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);// 消息内容String message 注册成功请短信回复[T]退订;// 发送消息并且指定routing key 为sms只有短信服务能接收到消息channel.basicPublish(EXCHANGE_NAME, sms, null, message.getBytes());System.out.println( [x] Sent message );channel.close();connection.close();} } 消费者1 public class Recv {private final static String QUEUE_NAME direct_exchange_queue_sms;//短信队列private final static String EXCHANGE_NAME test_direct_exchange;public static void main(String[] argv) throws Exception {// 获取到连接Connection connection ConnectionUtil.getConnection();// 获取通道Channel channel connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 绑定队列到交换机同时指定需要订阅的routing key。可以指定多个channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, sms);//指定接收发送方指定routing key为sms的消息//channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, email);// 定义队列的消费者DefaultConsumer consumer new DefaultConsumer(channel) {// 获取消息并且处理这个方法类似事件监听如果有消息的时候会被自动调用Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {// body 即消息体String msg new String(body);System.out.println( [短信服务] received : msg !);}};// 监听队列自动ACKchannel.basicConsume(QUEUE_NAME, true, consumer);} } 消费者2 public class Recv2 {private final static String QUEUE_NAME direct_exchange_queue_email;//邮件队列private final static String EXCHANGE_NAME test_direct_exchange;public static void main(String[] argv) throws Exception {// 获取到连接Connection connection ConnectionUtil.getConnection();// 获取通道Channel channel connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 绑定队列到交换机同时指定需要订阅的routing key。可以指定多个channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, email);//指定接收发送方指定routing key为email的消息// 定义队列的消费者DefaultConsumer consumer new DefaultConsumer(channel) {// 获取消息并且处理这个方法类似事件监听如果有消息的时候会被自动调用Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {// body 即消息体String msg new String(body);System.out.println( [邮件服务] received : msg !);}};// 监听队列自动ACKchannel.basicConsume(QUEUE_NAME, true, consumer);} } 我们发送sms的RoutingKey发现结果只有指定短信的消费者1收到消息了 ⑤Topics 通配符模式交换机类型topics Topics模型示意图 每个消费者监听自己的队列并且设置带统配符的routingkey,生产者将消息发给broker由交换机根据routingkey来转发消息到指定的队列。 Routingkey一般都是有一个或者多个单词组成多个单词之间以“.”分割例如inform.sms 通配符规则 #匹配一个或多个词 *匹配不多不少恰好1个词 举例 audit.#能够匹配audit.irs.corporate 或者 audit.irs audit.*只能匹配audit.irs 从示意图可知我们将发送所有描述动物的消息。消息将使用由三个字两个点组成的Routing key发送。路由关键字中的第一个单词将描述速度第二个颜色和第三个种类“speed.color.species”。 我们创建了三个绑定Q1绑定了“*.orange.*”Q2绑定了“.*.*.rabbit”和“lazy.”。 Q1匹配所有的橙色动物。 Q2匹配关于兔子以及懒惰动物的消息。 下面做个小练习假如生产者发送如下消息会进入哪个队列 quick.orange.rabbit       Q1 Q2   routingKeyquick.orange.rabbit的消息会同时路由到Q1与Q2 lazy.orange.elephant    Q1 Q2 quick.orange.fox           Q1 lazy.pink.rabbit              Q2  (值得注意的是虽然这个routingKey与Q2的两个bindingKey都匹配但是只会投递Q2一次) quick.brown.fox            不匹配任意队列被丢弃 quick.orange.male.rabbit   不匹配任意队列被丢弃 orange         不匹配任意队列被丢弃 下面我们以指定Routing keyquick.orange.rabbit为例验证上面的答案 生产者 public class Send {private final static String EXCHANGE_NAME test_topic_exchange;public static void main(String[] argv) throws Exception {// 获取到连接Connection connection ConnectionUtil.getConnection();// 获取通道Channel channel connection.createChannel();// 声明exchange指定类型为topicchannel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);// 消息内容String message 这是一只行动迅速的橙色的兔子;// 发送消息并且指定routing key为quick.orange.rabbitchannel.basicPublish(EXCHANGE_NAME, quick.orange.rabbit, null, message.getBytes());System.out.println( [动物描述] Sent message );channel.close();connection.close();} } 消费者1 public class Recv {private final static String QUEUE_NAME topic_exchange_queue_Q1;private final static String EXCHANGE_NAME test_topic_exchange;public static void main(String[] argv) throws Exception {// 获取到连接Connection connection ConnectionUtil.getConnection();// 获取通道Channel channel connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 绑定队列到交换机同时指定需要订阅的routing key。订阅所有的橙色动物channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, *.orange.*);// 定义队列的消费者DefaultConsumer consumer new DefaultConsumer(channel) {// 获取消息并且处理这个方法类似事件监听如果有消息的时候会被自动调用Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {// body 即消息体String msg new String(body);System.out.println( [消费者1] received : msg !);}};// 监听队列自动ACKchannel.basicConsume(QUEUE_NAME, true, consumer);} } 消费者2 public class Recv2 {private final static String QUEUE_NAME topic_exchange_queue_Q2;private final static String EXCHANGE_NAME test_topic_exchange;public static void main(String[] argv) throws Exception {// 获取到连接Connection connection ConnectionUtil.getConnection();// 获取通道Channel channel connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 绑定队列到交换机同时指定需要订阅的routing key。订阅关于兔子以及懒惰动物的消息channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, *.*.rabbit);channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, lazy.);// 定义队列的消费者DefaultConsumer consumer new DefaultConsumer(channel) {// 获取消息并且处理这个方法类似事件监听如果有消息的时候会被自动调用Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {// body 即消息体String msg new String(body);System.out.println( [消费者2] received : msg !);}};// 监听队列自动ACKchannel.basicConsume(QUEUE_NAME, true, consumer);} } 结果C1、C2是都接收到消息了 ⑥RPC RPC模型示意图 基本概念 Callback queue 回调队列客户端向服务器发送请求服务器端处理请求后将其处理结果保存在一个存储体中。而客户端为了获得处理结果那么客户在向服务器发送请求时同时发送一个回调队列地址reply_to。 Correlation id 关联标识客户端可能会发送多个请求给服务器当服务器处理完后客户端无法辨别在回调队列中的响应具体和那个请求时对应的。为了处理这种情况客户端在发送每个请求时同时会附带一个独有correlation_id属性这样客户端在回调队列中根据correlation_id字段的值就可以分辨此响应属于哪个请求。 流程说明 当客户端启动的时候它创建一个匿名独享的回调队列。在 RPC 请求中客户端发送带有两个属性的消息一个是设置回调队列的 reply_to 属性另一个是设置唯一值的 correlation_id 属性。将请求发送到一个 rpc_queue 队列中。服务器等待请求发送到这个队列中来。当请求出现的时候它执行他的工作并且将带有执行结果的消息发送给 reply_to 字段指定的队列。客户端等待回调队列里的数据。当有消息出现的时候它会检查 correlation_id 属性。如果此属性的值与请求匹配将它返回给应用 分享两道面试题 面试题 避免消息堆积 1 采用workqueue多个消费者监听同一队列。 2接收到消息以后而是通过线程池异步消费。 如何避免消息丢失 1 消费者的ACK机制。可以防止消费者丢失消息。 但是如果在消费者消费之前MQ就宕机了消息就没了 2可以将消息进行持久化。要将消息持久化前提是队列、Exchange都持久化 交换机持久化 队列持久化 消息持久化 Spring整合RibbitMQ 下面还是模拟注册服务当用户注册成功后向短信和邮件服务推送消息的场景 搭建SpringBoot环境 创建两个工程 mq-rabbitmq-producer和mq-rabbitmq-consumer分别配置1、2、3第三步本例消费者用注解形式可以不用配 1、添加AMQP的启动器 dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId /dependency dependencygroupIdorg.springframework.boot/groupIdartifactIdspring‐boot‐starter‐test/artifactId /dependency 2、在application.yml中添加RabbitMQ的配置  server:port: 10086 spring:application:name: mq-rabbitmq-producerrabbitmq:host: 192.168.1.103port: 5672username: kavitopassword: 123456virtualHost: /kavitotemplate:retry:enabled: trueinitial-interval: 10000msmax-interval: 300000msmultiplier: 2exchange: topic.exchangepublisher-confirms: true 属性说明  template有关AmqpTemplate的配置 retry失败重试 enabled开启失败重试 initial-interval第一次重试的间隔时长 max-interval最长重试间隔超过这个间隔将不再重试 multiplier下次重试间隔的倍数此处是2即下次重试间隔是上次的2倍 exchange缺省的交换机名称此处配置后发送消息如果不指定交换机就会使用这个 publisher-confirms生产者确认机制确保消息会正确发送如果发送失败会有错误回执从而触发重试 当然如果consumer只是接收消息而不发送就不用配置template相关内容。   3、定义RabbitConfig配置类配置Exchange、Queue、及绑定交换机。 Configuration public class RabbitmqConfig {public static final String QUEUE_EMAIL queue_email;//email队列public static final String QUEUE_SMS queue_sms;//sms队列public static final String EXCHANGE_NAMEtopic.exchange;//topics类型交换机public static final String ROUTINGKEY_EMAILtopic.#.email.#;public static final String ROUTINGKEY_SMStopic.#.sms.#;//声明交换机Bean(EXCHANGE_NAME)public Exchange exchange(){//durable(true) 持久化mq重启之后交换机还在return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();}//声明email队列/** new Queue(QUEUE_EMAIL,true,false,false)* durabletrue 持久化 rabbitmq重启的时候不需要创建新的队列* auto-delete 表示消息队列没有在使用时将被自动删除 默认是false* exclusive 表示该消息队列是否只在当前connection生效,默认是false*/Bean(QUEUE_EMAIL)public Queue emailQueue(){return new Queue(QUEUE_EMAIL);}//声明sms队列Bean(QUEUE_SMS)public Queue smsQueue(){return new Queue(QUEUE_SMS);}//ROUTINGKEY_EMAIL队列绑定交换机指定routingKeyBeanpublic Binding bindingEmail(Qualifier(QUEUE_EMAIL) Queue queue,Qualifier(EXCHANGE_NAME) Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs();}//ROUTINGKEY_SMS队列绑定交换机指定routingKeyBeanpublic Binding bindingSMS(Qualifier(QUEUE_SMS) Queue queue,Qualifier(EXCHANGE_NAME) Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs();}} 生产者(mq-rabbitmq-producer) 为了方便测试我直接把生产者代码放工程测试类发送routing key是topic.sms.email的消息那么mq-rabbitmq-consumer下那些监听的与交换机(topic.exchange)绑定并且订阅的routingkey中匹配了topic.sms.email规则的 队列就会收到消息。 SpringBootTest RunWith(SpringRunner.class) public class Send {AutowiredRabbitTemplate rabbitTemplate;Testpublic void sendMsgByTopics(){/*** 参数* 1、交换机名称* 2、routingKey* 3、消息内容*/for (int i0;i5;i){String message 恭喜您注册成功useridi;rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_NAME,topic.sms.email,message);System.out.println( [x] Sent message );}} } 运行测试类发送5条消息  web管理界面 可以看到已经创建了交换机以及queue_email、queue_sms 2个队列并且向这两个队列分别发送了5条消息 消费者(mq-rabbitmq-consumer) 编写一个监听器组件通过注解配置消费者队列以及队列与交换机之间绑定关系。也可以像生产者那样通过配置类配置 在SpringAmqp中对消息的消费者进行了封装和抽象。一个JavaBean的方法只要添加RabbitListener注解就可以成为了一个消费者。 Component public class ReceiveHandler {//监听邮件队列RabbitListener(bindings QueueBinding(value Queue(value queue_email, durable true),exchange Exchange(value topic.exchange,ignoreDeclarationExceptions true,type ExchangeTypes.TOPIC),key {topic.#.email.#,email.*}))public void rece_email(String msg){System.out.println( [邮件服务] received : msg !);}//监听短信队列RabbitListener(bindings QueueBinding(value Queue(value queue_sms, durable true),exchange Exchange(value topic.exchange,ignoreDeclarationExceptions true,type ExchangeTypes.TOPIC),key {topic.#.sms.#}))public void rece_sms(String msg){System.out.println( [短信服务] received : msg !);} } 属性说明  Componet类上的注解注册到Spring容器 RabbitListener方法上的注解声明这个方法是一个消费者方法需要指定下面的属性 bindings指定绑定关系可以有多个。值是QueueBinding的数组。QueueBinding包含下面属性 value这个消费者关联的队列。值是Queue代表一个队列 exchange队列所绑定的交换机值是Exchange类型 key队列和交换机绑定的RoutingKey可指定多个 启动mq-rabbitmq-comsumer项目 ok,邮件服务和短息服务接收到消息后就可以各自开展自己的业务了。
http://www.sadfv.cn/news/361717/

相关文章:

  • 网站开发文档需求分析互联网个人信用信息服务平台
  • 怀化网站建设哪家便宜企业网站和信息化建设金蝶
  • 网站进不去怎么解决建设网站费用主要包括哪些内容
  • 吉林沈阳网站建设seo实战密码电子书
  • 十堰网站网站建设武进网站建设价位
  • wordpress子文件夹建站做公司网站要收费吗
  • 样式网站福永网站制作
  • 网站开发技术方法与路线wordpress使用主题
  • 如何让网站被百度收录西安php网站建设专家
  • 旅游网站开发报价单眉山网站设计
  • 商城类网站能做响应式设计吗网址我的上网主页
  • 如何做链接淘宝客的网站在线获取颜色代码网站
  • 营口旅游网站开发万网空间存放两个网站
  • 网站像素大小教育培训机构管理系统
  • 网站空间如何搬家dedecms网站制作教程
  • 襄阳营销网站建设网站二级域名是什么
  • 网站域名备案变更郑州专门做网站的公司有哪些
  • 网站开发多久完成北京市工程建设交易中心
  • 建设部网站221号文件wordpress更改路径
  • 河南网站排名目前做的比较好的法律网站有哪些
  • 网站 默认页宁夏固原建设网站
  • 常州辉煌网络网站建设中铁二局深圳公司官网
  • 河北辛集住房和城乡建设厅网站南通e站网站建设
  • 广州网站设计企业企业网站建设维护
  • 如何查看网站的访问量重庆官网seo分析
  • saas系统是什么模式天津百度seo代理
  • 金华做网站的公司平台类网站有哪些
  • 网站的用户注册怎么做末年人免费观看网站
  • 商品网站建设实验报告网站建设的费用计入
  • 建设官方网站首页河北邯郸做网站