郑州机械网站制作,钓鱼网站后台是怎么做的,如何不让百度收录网站,做商城网站需要多少钱系列文章目录
第一章 Java线程池技术应用 第二章 CountDownLatch和Semaphone的应用 第三章 Spring Cloud 简介 第四章 Spring Cloud Netflix 之 Eureka 第五章 Spring Cloud Netflix 之 Ribbon 第六章 Spring Cloud 之 OpenFeign 第七章 Spring Cloud 之 GateWay 第八章 Sprin…系列文章目录
第一章 Java线程池技术应用 第二章 CountDownLatch和Semaphone的应用 第三章 Spring Cloud 简介 第四章 Spring Cloud Netflix 之 Eureka 第五章 Spring Cloud Netflix 之 Ribbon 第六章 Spring Cloud 之 OpenFeign 第七章 Spring Cloud 之 GateWay 第八章 Spring Cloud Netflix 之 Hystrix 第九章 代码管理gitlab 使用 第十章 SpringCloud Alibaba 之 Nacos discovery 第十一章 SpringCloud Alibaba 之 Nacos Config 第十二章 Spring Cloud Alibaba 之 Sentinel 第十三章 JWT 第十四章 RabbitMQ应用 文章目录 系列文章目录[TOC](文章目录) 前言1、RabbitMQ概念概念1.1、生产者和消费者1.2、队列1.3、交换机、路由键、绑定1.3.1、交换机类型 2、RabbitMQ运转流程2.1、生产者发送消息流程2.2、消费者接收消息的过程2.3、AMQP协议 3、RabbitMQ windows安装3.1、下载3.2、安装 4、Spring Boot 整合RabbitMQ4.1、在user-service添加依赖4.2、配置文件添加4.3、增加RabbitMQ配置类4.4、新增消费监听类4.5、消息生产端 总结
前言 一般MQ用于系统解耦、削峰使用常见于微服务、业务活动等场景。 MQ消息队列在微服务、业务活动等场景中的应用主要表现为系统解耦和削峰。 系统解耦 场景描述在微服务架构中服务与服务之间需要通信。如果采用直接调用方式服务间会存在强依赖关系一个服务的改动可能引发连锁反应。 MQ作用服务间可以通过消息队列进行通信一个服务将消息放入队列另一个服务从队列中取出消息进行处理。这种方式下服务间实现了解耦降低了相互的依赖。 削峰 场景描述在业务活动期间由于用户请求量短时间内剧增可能导致系统压力过大甚至崩溃。 MQ作用通过消息队列实现请求的缓冲。在高并发场景下系统可以将请求放入消息队列然后异步处理这些请求从而平滑系统的处理负载确保系统的稳定性。 综上所述MQ因其独特的队列属性和消息传递模式在分布式、微服务架构中发挥着重要的作用提高了系统的可用性和稳定性。 1、RabbitMQ概念概念
RabbitMQ整体上是一个生产者与消费者模型主要负责接收、存储和转发消息。
1.1、生产者和消费者
Producer生产者就是投递消息的一方。消息一般可以包含2个部分消息体和标签(Label)。消息的标签用来描述这条消息比如一个交换器的名称和一个路由键。Consumer消费者就是接受消息的一方。消费者连接到RabbitMQ服务器并订阅到队列上。当消费者消费一条消息时只是消费消息的消息体(payload)Broker消息中间件的服务节点。一个RabbitMQ Broker看做一台RabbitMQ服务器
1.2、队列
Queue队列是RabbitMQ的内部对象用于存储消息
1.3、交换机、路由键、绑定
Exchange交换器。生产者将消息发送到Exchange(交换器通常也可以用大写的X来表示)有交换器将消息路由到一个或者多个队列中。如果路由不到或许会返回给生产者或许直接丢弃。 RoutingKey路由键。生产者将消息发给交换器的时候一般会指定一个RoutingKey用来指定这个消息的路由规则而这个RoutingKey需要与交换器类型和绑定键(BindingKey)联合使用才能最终生效。 Binding绑定。RabbitMQ中通过绑定将交换器与队列联合起来在绑定的时候一般会指定一个绑定键(BindingKey)这样RabbitMQ就知道如何正确地将消息路由到队列了。
1.3.1、交换机类型
Direct Exchange直连交换机根据Routing Key(路由键)进行投递到不同队列。 Fanout Exchange扇形交换机采用广播模式根据绑定的交换机路由到与之对应的所有队列。 Topic Exchange主题交换机对路由键进行模式匹配后进行投递符号#表示一个或多个词*表示一个词。 Header Exchange头交换机不处理路由键。而是根据发送的消息内容中的headers属性进行匹配。 自学参考https://blog.csdn.net/qq_38550836/article/details/95358353
2、RabbitMQ运转流程
2.1、生产者发送消息流程
生产者连接到RabbitMQ Broker建立一个连接(Connection)开启一个信道(Channel)生产者声明一个交换器并设置相关属性比如交换器类型、是否持久化等生产者声明一个队列并设置相关属性比如是否排他、是否持久化、是否自动删除等生产者通过路由键将交换器和队列绑定起来生产者发送消息至RabbitMQ Broker其中包含路由键、交换器等信息相应的交换器根据接收到的路由键查找相匹配的队列。如果找到则将从生产者发送过来的消息存入相应的队列。如果没有找到则根据生产者配置的属性选择丢弃还是回退给生产者关闭信道关闭连接
2.2、消费者接收消息的过程
消费者连接到RabbitMQ Broker建立一个连接(Connection)开启一个信道(Channel)。消费者向RabbitMQ Broker请求消费相应队列中的消息可能会设置相应的回调函数以及做一些准备工作。等待RabbitMQ Broker回应并投递相应队列中队列的消息消费者接收消息。消费者确认(ack)接收到的消息。RabbitMQ从队列中删除相应已经被确认的消息。关闭信道关闭连接 无论是生产者还是消费者都需要和RabbitMQ Broker建立连接这个连接就是一条TCP连接也就是Connection。一旦TCP连接建立起来客户端紧接着可以创建一个AMQP信道(Channel)每个信道都会被指派一个唯一的ID。信道是建立在Connection之上的虚拟连接RabbitMQ处理的每条AMQP指令都是通过信道完成的。
2.3、AMQP协议
Advanced Message Queuing Protocol一个提供统一消息服务的应用层标准高级消息队列协议是应用层协议的一个开放标准为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息并不受客户端/中间件不同产品不同的开发语言等条件的限制。Erlang中的实现有RabbitMQ等
Broker接收和分发消息的应用RabbitMQ 就是 Message BrokerVirtual Host虚拟 Broker将多个单元隔离开Connectionpublisher / consumer 和 broker 之间的 tcp 连接Channelconnection 内部建立的逻辑连接通常每个线程创建单独的 channelRouting key路由键用来指示消息的路由转发相当于快递的地址Exchange交换机相当于快递的分拨中心Queue消息队列消息最终被送到这里等待 consumer 取走Bindingexchange 和 queue 之间的虚拟连接用于 message 的分发依据
3、RabbitMQ windows安装
3.1、下载
https://github.com/erlang/otp/releases/download/OTP-25.2/otp_win64_25.2.exe https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.11.5/rabbitmq-server-3.11.5.exe
3.2、安装
配置环境变量 cd D:\Program Files\RabbitMQ Server\rabbitmq_server-3.11.5\sbin 开启rabbitmq-plugins插件 rabbitmq-plugins enable rabbitmq_management 打开地址 http://127.0.0.1:15672/ 输入用户名/密码guest/guest
4、Spring Boot 整合RabbitMQ
4.1、在user-service添加依赖
dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId
/dependency4.2、配置文件添加
spring:rabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guest4.3、增加RabbitMQ配置类
package com.xxxx.user.config;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;Configuration
public class RabbitMQConfig {/******************direct**********************//*** 创建direct队列* return*/Beanpublic Queue directQueue(){return new Queue(directQueue);}/*** 创建direct交换机* return*/Beanpublic DirectExchange directExchange(){return new DirectExchange(directExchange);}/*** 把队列和交换机绑定在一起* param queue* param directExchange* return*/Beanpublic Binding bindingDirect(Qualifier(directQueue) Queue queue, DirectExchange directExchange){return BindingBuilder.bind(queue).to(directExchange).with(routingKey);}/******************topic**********************/Beanpublic Queue topicQuerue1(){return new Queue(topicQuerue1);}Beanpublic Queue topicQuerue2(){return new Queue(topicQuerue2);}Beanpublic TopicExchange topicExchange(){return new TopicExchange(topicExchange);}Beanpublic Binding bindingTopic1(Qualifier(topicQuerue1) Queue queue,Qualifier(topicExchange) TopicExchange topicExchange){return BindingBuilder.bind(queue).to(topicExchange).with(topic.key1);}/*** 通配符* 表示一个词# 表示零个或多个词* param queue* param topicExchange* return*/Beanpublic Binding bindingTopic2(Qualifier(topicQuerue2) Queue queue,Qualifier(topicExchange) TopicExchange topicExchange){return BindingBuilder.bind(queue).to(topicExchange).with(topic.#);}/******************fanout**********************/Beanpublic Queue fanoutQueue1(){return new Queue(fanoutQueue1);}Beanpublic Queue fanoutQueue2(){return new Queue(fanoutQueue2);}Beanpublic Queue fanoutQueue3(){return new Queue(fanoutQueue3);}Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange(fanoutExchange);}Beanpublic Binding bindingFanout1(Qualifier(fanoutQueue1) Queue queue,Qualifier(fanoutExchange) FanoutExchange fanoutExchange){return BindingBuilder.bind(queue).to(fanoutExchange);}Beanpublic Binding bindingFanout2(Qualifier(fanoutQueue2) Queue queue,Qualifier(fanoutExchange) FanoutExchange fanoutExchange){return BindingBuilder.bind(queue).to(fanoutExchange);}Beanpublic Binding bindingFanout3(Qualifier(fanoutQueue3) Queue queue,Qualifier(fanoutExchange) FanoutExchange fanoutExchange){return BindingBuilder.bind(queue).to(fanoutExchange);}
}4.4、新增消费监听类
package com.xxxx.user.consumer;import com.xxxx.drp.common.entity.UserInfo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;Component
Slf4j
RabbitListener(queues directQueue)
public class DataDirectReceiver {RabbitHandlerpublic void process(String data){log.info(收到directQueue队列信息 data);}RabbitHandlerpublic void process(UserInfo data){log.info(收到directQueue队列信息 data);}
}
package com.xxxx.user.consumer;import com.xxxx.common.entity.UserInfo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;Component
Slf4j
RabbitListener(queues {topicQuerue1,topicQuerue2})
public class DataFanoutReceiver {RabbitHandlerpublic void process(String data){log.info(收到topicQuerue队列信息 data);}RabbitHandlerpublic void process(UserInfo data){log.info(收到topicQuerue队列信息 data);}
}package com.xxxx.user.consumer;import com.xxxx.common.entity.UserInfo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;Component
Slf4j
RabbitListener(queues {fanoutQueue1,fanoutQueue2,fanoutQueue3})
public class DataTopicReceiver {RabbitHandlerpublic void process(String data){log.info(收到topicQuerue队列信息 data);}RabbitHandlerpublic void process(UserInfo data){log.info(收到topicQuerue队列信息 data);}
}
4.5、消息生产端
package com.xxxx.user;import com.xxxx.common.entity.UserInfo;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
SpringBootTest
public class DataSender {Autowiredprivate RabbitTemplate rabbitTemplate;Testpublic void sendDirect(){UserInfo userInfo new UserInfo();userInfo.setUserAccount(tiger);userInfo.setPassword(12345);this.rabbitTemplate.convertAndSend(directExchange,routingKey,userInfo);}Testpublic void sendTopic(){this.rabbitTemplate.convertAndSend(topicExchange,topic.key2,Hello world topic);}Testpublic void sendFanout(){this.rabbitTemplate.convertAndSend(fanoutExchange,,Hello world topic);}
}总结
MQ因其独特的队列属性和消息传递模式在分布式、微服务架构中发挥着重要的作用提高了系统的可用性和稳定性。人工智能AI编程知识库