电子商务网站开发总结,南昌 网站制作,做搜狗手机网站优化点,网站开发怎么做前言
本章我们来一次快速入门RabbitMQ——生产者与消费者。需要构建一个生产端与消费端的模型。什么意思呢#xff1f;我们的生产者发送一条消息#xff0c;投递到RabbitMQ集群也就是Broker。 我们的消费端进行监听RabbitMQ#xff0c;当发现队列中有消息后#xff0c;就进…
前言
本章我们来一次快速入门RabbitMQ——生产者与消费者。需要构建一个生产端与消费端的模型。什么意思呢我们的生产者发送一条消息投递到RabbitMQ集群也就是Broker。 我们的消费端进行监听RabbitMQ当发现队列中有消息后就进行消费。
1. 环境准备
本次整合主要采用SpringBoot框架需要对SpringBoot的使用有一定了解。
2.大概步骤
我们来看下大概步骤
ConnectionFacorty获取连接工厂Connection一个连接Channel数据通信信道可发送和接收消息Queue具体的消息存储队列Producer Consumer 生产者和消费者
这个连接工厂需要配置一些相应的信息例如: RabbitMQ节点的地址端口号VirtualHost等等。 Channel是我们RabbitMQ所有消息进行交互的关键。
3. 项目实战
3.1 连接工厂 public class ConnectionUtils {public static Connection getConnection() throws IOException, TimeoutException {//定义连接工厂ConnectionFactory factory new ConnectionFactory();//设置服务地址factory.setHost(127.0.0.1);//端口factory.setPort(5672);//amqp协议 端口 类似与mysql的3306//设置账号信息用户名、密码、vhostfactory.setVirtualHost(/vhost_cp);factory.setUsername(user_cp);factory.setPassword(123456);// 通过工程获取连接Connection connection factory.newConnection();return connection;}
}3.2 生产端 public class Producer {public static void main(String[] args) throws Exception {System.out.println(Producer start...);//1 创建ConnectionFactoryConnection connection ConnectionUtils.getConnection();//2 通过connection创建一个ChannelChannel channel connection.createChannel();//3 通过Channel发送数据
for(int i0; i 5; i){String msg Hello RabbitMQ!;//1 exchange 2 routingKey channel.basicPublish(, test001, null, msg.getBytes());}//4 记得要关闭相关的连接channel.close();connection.close();}
}
3.3 消费端 public class Consumer {public static void main(String[] args) throws Exception {System.out.println(Consumer start...);//1 创建ConnectionFactoryConnection connection ConnectionUtils.getConnection();//2通过connection创建一个ChannelChannel channel connection.createChannel();//3声明创建一个队列String queueName test001;channel.queueDeclare(queueName, true, false, false, null);//4创建消费者QueueingConsumer queueingConsumer new QueueingConsumer(channel);//5设置Channelchannel.basicConsume(queueName, true, queueingConsumer);while(true){//6 获取消息Delivery delivery queueingConsumer.nextDelivery();String msg new String(delivery.getBody());System.err.println(消费端: msg);//Envelope envelope delivery.getEnvelope();}}
}
3.4 源码解析 channel.queueDeclare(queueName, true, false, false, null);第一个参数queue:队列的名称 第二个参数durable 是否持久化。true消息会持久化到本地保证重启服务后消息不会丢失 第三个参数exclusive 表示独占方式设置为true 在某些情景下有必要例如顺序消费。表示只有一个channel可以去监听其他channel都不能够监听。目的就是为了保证顺序消费。 第四个参数autoDelete队列如果与Exchange未绑定则自动删除 第五个参数arguments扩展参数 channel.basicConsume(QUEUE_NAME, true, consumer);第二个参数 autoAck:自动签收消息
3.5 运行程序
1启动消费端 2查看管控台 可以看到已经有一个连接一个信道一个消费者等信息了。 可以看到信道目前的状态是空闲状态。 队列中多了test001队列。 关于管控台的介绍可以看这篇文章消息中间件——RabbitMQ四命令行与管控台的基本操作 3运行生产端 可以看到生产端发送完消息之后停下了消费端迅速接收到了消息。也可以继续通过管控台观察消费的情况。
4 问题
注意
这里面可能有一个问题为什么要先启动消费端呢
因为在消费端创建的队列我们必须要有队列才能够发送消息。
另一个问题在生产端代码中 channel.basicPublish(, test001, null, msg.getBytes());并没有设置exchange,只设置了队列名称消费端却依然能够消费到消息这是为什么呢
答发消息的一定要指定Exchange如果不指定Exchange或者Exchange为空的话它会默认走第一个 它的路由规则将相同命名的队列Queue的消息路由过去如果路由不过去将会把消息删除。