网站空间租用有哪些服务,网络推广培训方案,dz门户 WordPress,优秀企业简介100字文章目录1_RabbitMQ初入门1.RabbitMQ的介绍1.工作原理发送/接收消息的流程2. 为什么要使用消息队列#xff1f;3.使用了消息队列有什么缺点#xff1f;2.安装RabbitMQ3.入门程序Hello_消费者生产者_1.导入依赖2.生产者1.设置连接信息2.获取connection#xff08;连…
文章目录1_RabbitMQ初入门1.RabbitMQ的介绍1.工作原理发送/接收消息的流程2. 为什么要使用消息队列3.使用了消息队列有什么缺点2.安装RabbitMQ3.入门程序Hello_消费者生产者_1.导入依赖2.生产者1.设置连接信息2.获取connection连接channel信道3.channel信道绑定队列实现消费方法4.接收消息完整代码3.消费者1.设置连接信息2.获取connection连接channel信道3.channel信道绑定队列4.发送消息完整代码1_RabbitMQ初入门
1.RabbitMQ的介绍
1.工作原理发送/接收消息的流程 Broker消息队列服务进程此进程包括两个部分Exchange和Queue。 Exchange消息队列交换机按一定的规则将消息路由转发到某个队列对消息进行过虑。 Queue消息队列存储消息的队列消息到达队列并转发给指定的消费方。 Producer消息生产者即生产方客户端生产方客户端将消息发送到MQ。 Consumer消息消费者即消费方客户端接收MQ转发的消息。 消息发布接收流程
-----发送消息----- 1、生产者和Broker建立TCP连接。
2、生产者和Broker建立通道。
3、生产者通过通道消息发送给Broker由Exchange将消息进行转发。
4、Exchange将消息转发到指定的Queue队列----接收消息----- 1、消费者和Broker建立TCP连接
2、消费者和Broker建立通道
3、消费者监听指定的Queue队列
4、当有消息到达Queue时Broker默认将消息推送给消费者。
5、消费者接收到消息。 2. 为什么要使用消息队列
解耦异步削峰 3.使用了消息队列有什么缺点
分析一个使用了MQ的项目如果连这个问题都没有考虑过就把MQ引进去了那就给自己的项目带来了风险。我们引入一个技术要对这个技术的弊端有充分的认识才能做好防御。
系统的可用性降低如果消息队列挂了那么系统也会受到影响
系统的复杂性增加要多考虑很多方面的问题比如一致性问题、如何保证消息不被重复消费如何保证消息可靠传输。因此需要考虑的东更多系统的复杂性增大。
2.安装RabbitMQ
详情请参考此链接
3.入门程序Hello_消费者生产者_
1.导入依赖
dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId
/dependency
dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactId
/dependency!-- 这个是日志包的依赖 --
dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-logging/artifactId
/dependency2.生产者
1.设置连接信息
通过ConnectionFactory获取连接信息
//通过连接工厂创建新的连接和mq建立连接
ConnectionFactory connectionFactory new ConnectionFactory();
connectionFactory.setHost(127.0.0.1);
connectionFactory.setPort(5672);//端口
connectionFactory.setUsername(guest);
connectionFactory.setPassword(guest);
//设置虚拟机一个mq服务可以设置多个虚拟机每个虚拟机就相当于一个独立的mq
connectionFactory.setVirtualHost(/);2.获取connection连接channel信道
Connection connection null;
Channel channel null;
//建立新连接
connection connectionFactory.newConnection();
//创建会话通道,生产者和mq服务所有通信都在channel通道中完成
channel connection.createChannel();3.channel信道绑定队列实现消费方法
//队列
private static final String QUEUE helloworld;//监听队列
//声明队列如果队列在mq 中没有则要创建
//参数String queue, boolean durable, boolean exclusive, boolean autoDelete, M
/*** 参数明细* 1、queue 队列名称* 2、durable 是否持久化如果持久化mq重启后队列还在* 3、exclusive 是否独占连接队列只允许在该连接中访问如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时* 4、autoDelete 自动删除队列不再使用时是否自动删除此队列如果将此参数和exclusive参数设置为true就可以实现临时队列队列不* 5、arguments 参数可以设置一个队列的扩展参数比如可设置存活时间*/
channel.queueDeclare(QUEUE,true,false,false,null);//实现消费方法
DefaultConsumer defaultConsumer new DefaultConsumer(channel){/*** 当接收到消息后此方法将被调用* param consumerTag 消费者标签用来标识消费者的在监听队列时设置channel.basicConsume* param envelope 信封通过envelope* param properties 消息属性* param body 消息内容* throws IOException*/Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.B//交换机String exchange envelope.getExchange();//消息idmq在channel中用来标识消息的id可用于确认消息已接收long deliveryTag envelope.getDeliveryTag();//消息内容String message new String(body,utf-8);System.out.println(receive message:message);}
};4.接收消息 //监听队列//参数String queue, boolean autoAck, Consumer callback/*** 参数明细* 1、queue 队列名称* 2、autoAck 自动回复当消费者接收到消息后要告诉mq消息已接收如果将此参数设置为tru表示会自动回复mq如果设置为false要通过编程实现回复* 3、callback消费方法当消费者接收到消息要执行的方法*/channel.basicConsume(QUEUE,true,defaultConsumer);结果截图 完整代码
package com.xuecheng.test.rabbitmq;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer01 {//队列private static final String QUEUE helloworld;public static void main(String[] args) throws IOException, TimeoutException {//通过连接工厂创建新的连接和mq建立连接ConnectionFactory connectionFactory new ConnectionFactory();connectionFactory.setHost(127.0.0.1);connectionFactory.setPort(5672);//端口connectionFactory.setUsername(guest);connectionFactory.setPassword(guest);//设置虚拟机一个mq服务可以设置多个虚拟机每个虚拟机就相当于一个独立的mqconnectionFactory.setVirtualHost(/);//建立新连接Connection connection connectionFactory.newConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成Channel channel connection.createChannel();//监听队列//声明队列如果队列在mq 中没有则要创建//参数String queue, boolean durable, boolean exclusive, boolean autoDelete, MapString, Object arguments/*** 参数明细* 1、queue 队列名称* 2、durable 是否持久化如果持久化mq重启后队列还在* 3、exclusive 是否独占连接队列只允许在该连接中访问如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建* 4、autoDelete 自动删除队列不再使用时是否自动删除此队列如果将此参数和exclusive参数设置为true就可以实现临时队列队列不用了就自动删除* 5、arguments 参数可以设置一个队列的扩展参数比如可设置存活时间*/channel.queueDeclare(QUEUE,true,false,false,null);//实现消费方法DefaultConsumer defaultConsumer new DefaultConsumer(channel){/*** 当接收到消息后此方法将被调用* param consumerTag 消费者标签用来标识消费者的在监听队列时设置channel.basicConsume* param envelope 信封通过envelope* param properties 消息属性* param body 消息内容* throws IOException*/Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//交换机String exchange envelope.getExchange();//消息idmq在channel中用来标识消息的id可用于确认消息已接收long deliveryTag envelope.getDeliveryTag();//消息内容String message new String(body,utf-8);System.out.println(receive message:message);}};//监听队列//参数String queue, boolean autoAck, Consumer callback/*** 参数明细* 1、queue 队列名称* 2、autoAck 自动回复当消费者接收到消息后要告诉mq消息已接收如果将此参数设置为tru表示会自动回复mq如果设置为false要通过编程实现回复* 3、callback消费方法当消费者接收到消息要执行的方法*/channel.basicConsume(QUEUE,true,defaultConsumer);}
}
3.消费者
1.设置连接信息
通过ConnectionFactory获取连接信息
//通过连接工厂创建新的连接和mq建立连接
ConnectionFactory connectionFactory new ConnectionFactory();
connectionFactory.setHost(127.0.0.1);
connectionFactory.setPort(5672);//端口
connectionFactory.setUsername(guest);
connectionFactory.setPassword(guest);
//设置虚拟机一个mq服务可以设置多个虚拟机每个虚拟机就相当于一个独立的mq
connectionFactory.setVirtualHost(/);2.获取connection连接channel信道
Connection connection null;
Channel channel null;
//建立新连接
connection connectionFactory.newConnection();
//创建会话通道,生产者和mq服务所有通信都在channel通道中完成
channel connection.createChannel();3.channel信道绑定队列
//队列
private static final String QUEUE helloworld;//声明队列如果队列在mq 中没有则要创建
//参数String queue, boolean durable, boolean exclusive, boolean autoDelete, MapString, Object arguments
/*** 参数明细* 1、queue 队列名称* 2、durable 是否持久化如果持久化mq重启后队列还在* 3、exclusive 是否独占连接队列只允许在该连接中访问如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建* 4、autoDelete 自动删除队列不再使用时是否自动删除此队列如果将此参数和exclusive参数设置为true就可以实现临时队列队列不用了就自动删除* 5、arguments 参数可以设置一个队列的扩展参数比如可设置存活时间*/
channel.queueDeclare(QUEUE,true,false,false,null);4.发送消息
//参数String exchange, String routingKey, BasicProperties props, byte[] bod
/*** 参数明细* 1、exchange交换机如果不指定将使用mq的默认交换机设置为* 2、routingKey路由key交换机根据路由key来将消息转发到指定的队列如果使用默认交换机routingKey设置为队列的名称* 3、props消息的属性* 4、body消息内容*/
//消息内容
String message hello world 黑马程序员;
channel.basicPublish(,QUEUE,null,message.getBytes());
System.out.println(send to mq message);结果截图 完整代码
package com.xuecheng.test.rabbitmq;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer01 {//队列private static final String QUEUE helloworld;public static void main(String[] args) {//通过连接工厂创建新的连接和mq建立连接ConnectionFactory connectionFactory new ConnectionFactory();connectionFactory.setHost(127.0.0.1);connectionFactory.setPort(5672);//端口connectionFactory.setUsername(guest);connectionFactory.setPassword(guest);//设置虚拟机一个mq服务可以设置多个虚拟机每个虚拟机就相当于一个独立的mqconnectionFactory.setVirtualHost(/);Connection connection null;Channel channel null;try {//建立新连接connection connectionFactory.newConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成channel connection.createChannel();//声明队列如果队列在mq 中没有则要创建//参数String queue, boolean durable, boolean exclusive, boolean autoDelete, MapString, Object arguments/*** 参数明细* 1、queue 队列名称* 2、durable 是否持久化如果持久化mq重启后队列还在* 3、exclusive 是否独占连接队列只允许在该连接中访问如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建* 4、autoDelete 自动删除队列不再使用时是否自动删除此队列如果将此参数和exclusive参数设置为true就可以实现临时队列队列不用了就自动删除* 5、arguments 参数可以设置一个队列的扩展参数比如可设置存活时间*/channel.queueDeclare(QUEUE,true,false,false,null);//发送消息//参数String exchange, String routingKey, BasicProperties props, byte[] body/*** 参数明细* 1、exchange交换机如果不指定将使用mq的默认交换机设置为* 2、routingKey路由key交换机根据路由key来将消息转发到指定的队列如果使用默认交换机routingKey设置为队列的名称* 3、props消息的属性* 4、body消息内容*///消息内容String message hello world 黑马程序员;channel.basicPublish(,QUEUE,null,message.getBytes());System.out.println(send to mq message);} catch (Exception e) {e.printStackTrace();} finally {//关闭连接//先关闭通道try {channel.close();} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}try {connection.close();} catch (IOException e) {e.printStackTrace();}}}
}
相关文章: