当前位置: 首页 > news >正文

请上传网站应用水印图片汽车宣传软文

请上传网站应用水印图片,汽车宣传软文,简单的网页制作源代码大全,没有备案做盈利性的网站违法吗MQ#xff08;message queue#xff09;#xff1a;本质上是个队列#xff0c;遵循FIFO原则#xff0c;队列中存放的是message#xff0c;是一种跨进程的通信机制#xff0c;用于上下游传递消息。MQ提供“逻辑解耦物理解耦”的消息通信服务。使用了MQ之后消息发送上游只… MQmessage queue本质上是个队列遵循FIFO原则队列中存放的是message是一种跨进程的通信机制用于上下游传递消息。MQ提供“逻辑解耦物理解耦”的消息通信服务。使用了MQ之后消息发送上游只需要依赖MQ不需要依赖其它服务。 功能1流量消峰 功能2应用解耦 功能3异步处理 MQ的分类 1.Kafka 2.RabbitMQ RabbitMQ概念 四大核心概念 交换机 队列  六大核心模式 1.简单模式。2.工作模式。3.发布订阅模式。4.路由模式。5.主题模式。6.发布确认模式。 RabbitMQ工作原理 Channer信道发消息的通道。 下载 1. 官网地址https://www.rabbitmq.com/download.html。参考的下载地址如下Linux下安装RabbitMQ_rabbitmq下载_零碎de記憶的博客-CSDN博客 2.安装Erlang环境 yum install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c kernel-devel m4 ncurses-devel tk tc xz tcp_wrappers3.下载Erlang方式1找到下面网址在网址中下载rpm文件 el/7/erlang-22.3.4.12-1.el7.x86_64.rpm - rabbitmq/erlang · packagecloud 或者直接输入下面指令下载rpm文件  wget --content-disposition https://packagecloud.io/rabbitmq/erlang/packages/el/7/erlang-22.3.4.12-1.el7.x86_64.rpm/download.rpm然后输入下面的命令安装已下载的安装包 yum localinstall erlang-22.3.4.12-1.el7.x86_64.rpm4.下载RabbitMQ输入下面的下载 wget --content-disposition https://packagecloud.io/rabbitmq/rabbitmq-server/packages/el/7/rabbitmq-server-3.8.13-1.el7.noarch.rpm/download.rpm输入下面的命令进行本地安装 yum localinstall rabbitmq-server-3.8.13-1.el7.noarch.rpm 5. 下载socat检查是否已下载 yum install socat -y 注意以下的操作都要在 /usr/local/software目录下查看  6.添加开机启动RabbitMQ服务chkconfig rabbitmq-server on。启动rabbitmq /sbin/service rabbitmq-server start。 7.查看服务状态 /sbin/service rabbitmq-server status 8.停止服务 /sbin/service rabbitmq-server stop。重新查看服务状态。 10.开启web管理界面 sudo rabbitmq-plugins enable rabbitmq_management 11.查看防火墙状态systemctl status firewalld。关闭防火墙systemctl stop firewalld。关闭rabbit服务器输入sudo rabbitmqctl stop。开启rabbit服务器输入sudo rabbitmq-server -detached。 12.在浏览器中输入地址Linux服务器ip地址:15672可访问web管理界面。 13.用户名guest密码默认但无法登陆无权限。 14.rabbitmqctl list_users查看用户。创建账号 rabbitmqctl add_user admin 123。设置用户角色为管理员 rabbitmqctl set_user_tags admin administrator。设置用户权限 rabbitmqctl set_permissions -p / admin .* .* .*。 15.再经尝试可以重新登录 创建Java开发环境 1.创建1个新项目命名atguigu-rabbitmq然后创建模块Module。GroupId可以填写com.atguigu.rabbitmqArtifactId可以填rabbitmq-hello选择quickstart 导入依赖如下 dependencies!--rabbitmq依赖客户端--dependencygroupIdcom.rabbitmq/groupIdartifactIdamqp-client/artifactIdversion5.8.0/version/dependency!--操作文件流的依赖--dependencygroupIdcommons-io/groupIdartifactIdcommons-io/artifactIdversion2.6/version/dependencydependencygroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-compiler-plugin/artifactIdversion3.8.1/version !-- 根据你的需求指定版本号 --/dependency/dependenciesbuildpluginsplugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-compiler-plugin/artifactIdconfigurationsource8/sourcetarget8/target/configuration/plugin/plugins/build 在下图中P是生产者C是消费者。中间的框是一个队列-RabbitMQ代表使用者保留的消息缓存区。 生产者代码 public class producer {//队列名称public static final String QUEUE_NAME hello;//发消息public static void main( String[] args ) throws IOException, TimeoutException {//第1步创建一个连接工程ConnectionFactory factory new ConnectionFactory();//第2步输入工厂IP用户名和密码——连接RabbitMQd队列factory.setHost(192.168.182.136);factory.setUsername(admin);factory.setPassword(123);//第3步创建连接Connection connection factory.newConnection();//第4步获取信道Channel channel connection.createChannel();//第5步生成一个队列队列名称是否持久化是否排他自动删除队列参数//持久化:是否存储入磁盘默认是将消息存储在内存中//排他队列是否只供一个消费者消费是否进行消息共享true可以供多个消费者消费//自动删除最后一个消费者断开连接后该队列是否自动删除channel.queueDeclare(QUEUE_NAME,false,false,false,null);//第6步发消息,(交换机路由key本次是队列名,参数发送的消息)String message hello world;channel.basicPublish(,QUEUE_NAME,null,message.getBytes());System.out.println(消息发送成功);} } 消费者代码 public class consumer {public static final String QUEUE_NAME hello;public static void main(String [] args) throws IOException, TimeoutException {//第1步创建一个连接工程ConnectionFactory factory new ConnectionFactory();//第2步输入工厂IP用户名和密码——连接RabbitMQd队列factory.setHost(192.168.182.136);factory.setUsername(admin);factory.setPassword(123);//第3步创建连接Connection connection factory.newConnection();//第4步获取信道Channel channel connection.createChannel();//第5步声明接收消息DeliverCallback deliverCallback (consumerTag,message)-{System.out.println(new String(message.getBody()));};//第6步取消消息时的回调CancelCallback cancelCallback consumerTag-{System.out.println(消息消费被中断);};//第7步接收,(队列名自动or手动接收消息回调)//1.消费哪个队列2.消费成功后是否要自动应答true代表自动应答false表示手动应答//3.消费者未成功消费的回调channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);} } 注意几点1.确保rabbitmq处于开启的状态开启方式见前面2.最好让防火墙处于关闭的状态 3.最好通过方法左侧的开关按钮进行启动确保启动是选择Current File。 工作队列 工作队列又称任务队列主要思想是避免立即执行资源密集型任务而不得不等待它完成。相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时这些工作线程将一起处理这些任务。 情况生产者大量分发消息给队列工作线程接收队列的消息工作线程不止一个三者关系时竞争关系你一条我一条他一条但要注意一个消息只能被处理一次不能被处理多次。 重复性的代码可以被抽取成为工具类。 在java — com — atguigu — rabbitmq下创建utils包工具类起名RabbitMqUtils放入如下代码 public class RabbitMqUtils {public static Channel getChannel() throws Exception{//第1步创建一个连接工程ConnectionFactory factory new ConnectionFactory();//第2步输入工厂IP用户名和密码——连接RabbitMQd队列factory.setHost(192.168.182.137);factory.setUsername(admin);factory.setPassword(123);//第3步创建连接Connection connection factory.newConnection();//第4步获取信道Channel channel connection.createChannel();return channel;} } 工作线程的更新后worker01的代码如下 public static final String QUEUE_NAME hello;public static void main(String [] args) throws Exception {Channel channel RabbitMqUtils.getChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);//声明队列没有会报错//消息接收DeliverCallback deliverCallback (consumerTag,message)-{System.out.println(接收到的消息 new String(message.getBody()));};CancelCallback cancelCallback (consumerTag)-{System.out.println(consumerTag 消息被取消消费接口回调逻辑);};System.out.println(c1等待接收消息......);channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);} 重复利用one包下的consumer类将其更改为c2工作线程 Task01作为生产者用于生产数据与前面不同的是Task01支持从IDEA控制台输入数据 public class Task01 {public static final String QUEUE_NAMEhello;public static void main(String[] args) throws Exception {Channel channel RabbitMqUtils.getChannel();channel.queueDeclare(QUEUE_NAME,false,false,false,null);//从控制台当中接收信息Scanner scanner new Scanner(System.in); //扫描控制台输入内容while(scanner.hasNext()){String message scanner.next();channel.basicPublish(,QUEUE_NAME,null,message.getBytes());System.out.println(发送消息完成..);}} } 消息应答 概念 自动应答 手动应答 手动应答好处建议不批量应答选择false 消息自动重新入队 原本正常传输C1突然失去连接检测到C1断开连接于是会让消息重新入队原本的消息交由C2进行处理。 实验思路写1个生产者2个消费者当关闭掉其中1个工作线程消息不丢失还被另一个工作线程接收。消费在手动应答时不丢失、放回队列中重新消费。 消息手动应答生产者 public class Task2 { public static final String task_queue_name ack_queue; public static void main(String[] args) throws Exception {Channel channel RabbitMqUtils.getChannel();channel.queueDeclare(task_queue_name,false,false,false,null);Scanner scanner new Scanner(System.in);while(scanner.hasNext()){String message scanner.next();channel.basicPublish(,task_queue_name,null,message.getBytes(UTF-8));System.out.println(生产者发出消息message);} } } 消息手动应答消费者 public class Work03 {public static final String task_queue_name ack_queue;public static void main(String[] args) throws Exception {Channel channel RabbitMqUtils.getChannel();System.out.println(C1等待接收消息处理时间较短);DeliverCallback deliverCallback (consumerTag,message)-{SleepUtils.sleep(1);System.out.println(接收到的消息new String(message.getBody(),UTF-8));//1.消息的标记tag 2.是否批量应答channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};//采用手动应答boolean autoAck false;channel.basicConsume(task_queue_name,autoAck,deliverCallback,(consumerTag-{System.out.println(consumerTag 消费者取消消费接口回调逻辑);}));} } public class Work04 {public static final String task_queue_name ack_queue;public static void main(String[] args) throws Exception {Channel channel RabbitMqUtils.getChannel();System.out.println(C2等待接收消息处理时间较短);DeliverCallback deliverCallback (consumerTag,message)-{SleepUtils.sleep(30);System.out.println(接收到的消息new String(message.getBody(),UTF-8));//1.消息的标记tag 2.是否批量应答channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};//采用手动应答boolean autoAck false;channel.basicConsume(task_queue_name,autoAck,deliverCallback,(consumerTag-{System.out.println(consumerTag 消费者取消消费接口回调逻辑);}));} } 实现效果在生产者输入AA BB CC DD EE等消息消费者1接收速度快会立即打印AA CC EE等消息消费者2接收速度慢会在一段时间后接收到BB此时如果关闭消费者2则消费者1输出DD表明消费在手动应答时不丢失、放回队列中重新消费。 持久化 如果报错说明原本的队列就是不持久化此时无法设定持久化只能先将队列删除然后再重新设定。 控制队列持久化需要修改生产者声明函数的第2个参数 消息持久化 队列持久化和消息持久化不同队列是MQ里的一个组件消息是生产者发送的消息。 如果要让消息持久化在发消息的时候就要通知队列。 更改的是生产者的信道的basicPublish的第3个参数添加MessageProperties.PERSISTENT_TEXT_PLAIN 不公平分发  消费者处理任务的速度不一致为了不让速度快的消费者长时间处于空闲状态因此采用不公平分发。 int prefetchCount 1; channel.basicQos(prefetchCount); 预取值 前面N条数据分别交给谁处理如下图就是前7条数据中2条给C15条给C2 发布确认原理 1.设置要求队列必须持久化就算服务器宕机队列也不至于消失。 2.设置要求队列中的消息也必须持久化。 3. 发布确认消息保存到磁盘上之后队列要告知生产者。 Channel channel connection.createChannel(); channel.confirmSelect(); public static void main(String[] args){ } 单个发布确认 是一种同步确认发布的方式发布消息-确认消息-发布消息...必须要确认后才能继续发布类似于一手交钱一手交货缺点是发布速度很慢。 1. 创建com/atguigu/rabbitmq/four文件夹下的ConfirmMessage public static void publishMessageIndividually() throws Exception{Channel channel RabbitMqUtils.getChannel(); //获取信道String queueName UUID.randomUUID().toString();channel.queueDeclare(queueName,false,false,false,null);channel.confirmSelect();//开启发布确认long begin System.currentTimeMillis();for(int i0;iMESSAGE_COUNT;i){String message i ;channel.basicPublish(,queueName,null,message.getBytes());boolean flag channel.waitForConfirms();if(flag){System.out.println(消息发送成功);}}long end System.currentTimeMillis();System.out.println(发布MESSAGE_COUNT个单独确认消息耗时(end-begin)ms);} 批量发布确认 public static void publishMessageBatch() throws Exception{Channel channel RabbitMqUtils.getChannel(); //获取信道String queueName UUID.randomUUID().toString();channel.queueDeclare(queueName,false,false,false,null);channel.confirmSelect();//开启发布确认long begin System.currentTimeMillis();int batchSize 100; //批量确认消息的大小//批量发送消息批量发布确认for(int i0;iMESSAGE_COUNT;i){String message i;channel.basicPublish(,queueName,null,message.getBytes());//判断达到100条消息的时候批量确认一次if(i%batchSize0) channel.waitForConfirms();}long end System.currentTimeMillis();System.out.println(发布MESSAGE_COUNT个批量确认消息耗时(end-begin)ms); } 异步发布确认 map序列key是消息序号deliveryTag是消息的标识multiple是是否为批量value是消息内容将消息每一条都编号broker会对消息进行应答分为两种一种是确认应答另一种是未确认应答。消息生产者不需要等待接收方的消息只需要负责发送消息即可消息是否应答最终会以异步的形式回传也就是说确认的时间可以是稍后的。 addConfirmListener单参数的是只能监听成功的多参数的是可以监听成功也可以监听失败的都是接口需要自己来写。 public static void publishMessageAsync() throws Exception{Channel channel RabbitMqUtils.getChannel(); //获取信道String queueName UUID.randomUUID().toString();channel.queueDeclare(queueName,false,false,false,null);channel.confirmSelect();//开启发布确认long begin System.currentTimeMillis();//消息确认成功回调函数ConfirmCallback ackCallback (deliveryTag, multiple)-{System.out.println(确认的消息deliveryTag);};//消息确认失败回调函数ConfirmCallback nackCallback (deliveryTag, multiple)-{System.out.println(未确认的消息deliveryTag);};//准备消息的监听器监听哪些消息成功了哪些消息失败了channel.addConfirmListener(ackCallback,nackCallback);//批量发送消息for(int i0;iMESSAGE_COUNT;i){String message消息i;channel.basicPublish(,queueName,null,message.getBytes());//发布确认}long end System.currentTimeMillis();System.out.println(发布MESSAGE_COUNT个异步确认消息耗时(end-begin)ms); } 处理异步未确认消息 最好的解决方案就是把未确认的消息放到一个基于内存的能被发布线程访问的队列比如说用ConcurrentLinkedQueue这个队列在confirm callbacks与发布线程之间进行消息的传递。 public static void publishMessageAsync() throws Exception{Channel channel RabbitMqUtils.getChannel(); //获取信道String queueName UUID.randomUUID().toString();channel.queueDeclare(queueName,false,false,false,null);channel.confirmSelect();//开启发布确认/*线程安全有序的一个哈希表适用于高并发的情况下1.轻松地将序号与消息进行关联2.轻松地批量删除条目只要给到序号3.支持高并发多线程*/ConcurrentSkipListMapLong,String outstandingConfirms new ConcurrentSkipListMap();//消息确认成功回调函数ConfirmCallback ackCallback (deliveryTag, multiple)-{if(multiple){//2.删除掉已经确认的消息剩下的就是未确认的消息ConcurrentNavigableMapLong, String confirmd outstandingConfirms.headMap(deliveryTag);}else{outstandingConfirms.remove(deliveryTag);}System.out.println(确认的消息deliveryTag);};//消息确认失败回调函数ConfirmCallback nackCallback (deliveryTag, multiple)-{//3.打印一下未确认的消息都有哪些String message outstandingConfirms.get(deliveryTag);System.out.println(未确认的消息是message未确认的消息deliveryTag);};//准备消息的监听器监听哪些消息成功了哪些消息失败了channel.addConfirmListener(ackCallback,nackCallback);long begin System.currentTimeMillis();//批量发送消息for(int i0;iMESSAGE_COUNT;i){String message消息i;channel.basicPublish(,queueName,null,message.getBytes());//1.此处记录下所有发送的消息消息的总和outstandingConfirms.put(channel.getNextPublishSeqNo(),message);}long end System.currentTimeMillis();System.out.println(发布MESSAGE_COUNT个异步确认消息耗时(end-begin)ms);} } 三种方式对比 交换机 一个消息可以被消费多次需要通过交换机仍旧遵循队列中的消息只能被消费一次。 生产者生产的消息从不会直接发送到队列。生产者将消息发送到交换机。交换机负责接收来自生产者的消息将消息推入队列。 Exchanges的类型直接direct主题topic标题headers扇出fanout 消息能路由发送到队列中其实是由routingKey(bindingkey)绑定key指定的。 创建临时队列 String queueName channel.queueDedare().getQueue(); 绑定 根据Routing key来确定消息要发给哪个队列如果Routing Key相同消息就可以发送给多个队列。 先添加一个队列queue1再添加一个交换机exchange1最后点击exchange1交换机进入绑定菜单然后输入绑定的队列是queue1然后Routing key随便设置为123。 广播Fanout Fanout扇出是将接收到的所有消息广播到它知道的所有队列中。如果Routing Key相同则发送给队列以相同消息。 生产者 public class EmitLog {public static final String EXCHANGE_NAMElogs;public static void main(String[] args) throws Exception {Channel channel RabbitMqUtils.getChannel();channel.exchangeDeclare(EXCHANGE_NAME,fanout);Scanner scanner new Scanner(System.in);while(scanner.hasNext()){String message scanner.next();channel.basicPublish(EXCHANGE_NAME,,null,message.getBytes(UTF-8));System.out.println(生产者发出消息message);}} } 消费者 public class ReceiveLogs01 {public static final String EXCHANGE_NAMElogs;public static void main(String[] args) throws Exception {Channel channel RabbitMqUtils.getChannel();channel.exchangeDeclare(EXCHANGE_NAME,fanout);//声明一个交换机//声明一个队列临时队列队列的名称是随机的当消费者断开与队列的连接时候队列就删除了String queueName channel.queueDeclare().getQueue();//绑定交换机与队列channel.queueBind(queueName,EXCHANGE_NAME,);System.out.println(等待接收消息把接收到消息打印在屏幕上......);DeliverCallback deliverCallback (consumerTag,message)-{System.out.println(ReceiveLogs01控制台打印接收到的消息new String(message.getBody(),UTF-8));};channel.basicConsume(queueName,true,deliverCallback,consumerTag-{});} } 效果实现广播的功能 Direct路由交换机 消费者1 public class ReceiveLogsDirect01 {public static final String EXCHANGE_NAMEdirect_logs;public static void main(String[] args) throws Exception {Channel channel RabbitMqUtils.getChannel();channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);channel.queueDeclare(console,false,false,false,null);channel.queueBind(console,EXCHANGE_NAME,info); //队列名称交换机名称RoutingkeyDeliverCallback deliverCallback (consumerTag,message)-{System.out.println(ReceiveLogsDirect01控制台打印接收到的消息new String(message.getBody(),UTF-8));};channel.basicConsume(console,true,deliverCallback,consumerTag-{});} } 消费者2 public class ReceiveLogsDirect02 {public static final String EXCHANGE_NAMEdirect_logs;public static void main(String[] args) throws Exception {Channel channel RabbitMqUtils.getChannel();channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);channel.queueDeclare(disk,false,false,false,null);channel.queueBind(disk,EXCHANGE_NAME,error); //队列名称交换机名称RoutingkeyDeliverCallback deliverCallback (consumerTag,message)-{System.out.println(ReceiveLogsDirect02控制台打印接收到的消息new String(message.getBody(),UTF-8));};channel.basicConsume(disk,true,deliverCallback,consumerTag-{});} } 生产者  public class DirectLogs {public static final String EXCHANGE_NAMEdirect_logs;public static void main(String[] args) throws Exception {Channel channel RabbitMqUtils.getChannel();Scanner scanner new Scanner(System.in);while(scanner.hasNext()){String message scanner.next();channel.basicPublish(EXCHANGE_NAME,info,null,message.getBytes(UTF-8));System.out.println(生产者发出消息message);}} } 效果 如果【channel.basicPublish(EXCHANGE_NAME,info,null,message.getBytes(UTF-8));】的第2个参数填info就只会发送消息给消费者1填写error就只会发送消息给消费者2。 Topics主题交换机 发布生产者订阅消费者模式 消费者1 public class ReceiveLogsTopic01 {public static final String EXCHANGE_NAMEtopic_logs;//交换机名称public static void main(String[] args) throws Exception {Channel channel RabbitMqUtils.getChannel();channel.exchangeDeclare(EXCHANGE_NAME,topic);String queueNameQ1;channel.queueDeclare(queueName,false,false,false,null);channel.queueBind(queueName,EXCHANGE_NAME,*.orange.*);System.out.println(等待接收消息.....);DeliverCallback deliverCallback (consumerTag,message)-{System.out.println(new String(message.getBody(),UTF-8));System.out.println(接收队列queueName 绑定键message.getEnvelope().getRoutingKey());};channel.basicConsume(queueName,true,deliverCallback,consumerTag-{});} } 消费者2 public class ReceiveLogsTopic02 {public static final String EXCHANGE_NAMEtopic_logs;//交换机名称public static void main(String[] args) throws Exception {Channel channel RabbitMqUtils.getChannel();channel.exchangeDeclare(EXCHANGE_NAME,topic);String queueNameQ2;channel.queueDeclare(queueName,false,false,false,null);channel.queueBind(queueName,EXCHANGE_NAME,*.*.rabbit);channel.queueBind(queueName,EXCHANGE_NAME,lazy.#);System.out.println(等待接收消息.....);DeliverCallback deliverCallback (consumerTag,message)-{System.out.println(new String(message.getBody(),UTF-8));System.out.println(接收队列queueName 绑定键message.getEnvelope().getRoutingKey());};channel.basicConsume(queueName,true,deliverCallback,consumerTag-{});} } 生产者1 public class EmitLogTopic {public static final String EXCHANGE_NAMEtopic_logs;public static void main(String[] args) throws Exception {Channel channel RabbitMqUtils.getChannel();MapString,String bindingKeyMap new HashMap();bindingKeyMap.put(quick.orange.rabbit,被队列Q1Q2接收到);bindingKeyMap.put(lazy.orange.elephant,被队列Q1Q2接收到);bindingKeyMap.put(quick.orange.fox,被队列Q1接收到);bindingKeyMap.put(lazy.brown.fox,被队列Q2接收到);bindingKeyMap.put(lazy.pink.rabbit,虽然满足两个绑定但只被队列Q2接收一次);bindingKeyMap.put(quick.brown.fox,不匹配任何绑定不会被任何队列接收到会被丢弃);bindingKeyMap.put(quick.orange.male.rabbit,是四个单词不匹配任何绑定会被丢弃);bindingKeyMap.put(lazy.orange.male.rabbit,是四个单词但匹配Q2);for (Map.EntryString, String bindingKeyEntry : bindingKeyMap.entrySet()) {String routingKey bindingKeyEntry.getKey();String message bindingKeyEntry.getValue();channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes(UTF-8));System.out.println(生产者发出消息message);}} } 结果 死信 无法被消费消息被称为死信 死信的来源 死信实战架构图 1个生产者2个消费者。生产者原本走正常交换机消息走正常队列被C1消费。当满足消息被拒绝消息TTL过期队列达到最大长度三者其一时消息成为死信会进入dead_exchange交换机进入dead_queue死信队列死信队列的信息由C2消费。 消费者1 public class Consumer01 {//普通交换机的名称public static final String NORMAL_EXCHANGE normal_exchange;//死信交换机的名称public static final String DEAD_EXCHANGE dead_exchange;//普通队列的名称public static final String NORMAL_QUEUE normal_queue;//死信队列的名称public static final String DEAD_QUEUE dead_queue;public static void main(String[] args) throws Exception {Channel channel1 RabbitMqUtils.getChannel();//声明死信和普通交换机类型为directchannel1.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel1.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);MapString,Object arguments new HashMap(); //设置参数//正常队列设置死信交换机arguments.put(x—dead—letter—exchange,DEAD_EXCHANGE); //****相当于正常的C1不能消费掉就通过这个交换机进行转发//设置死信RoutingKeyarguments.put(x—dead—letter—routing—key,lisi); //***//声明普通队列channel1.queueDeclare(NORMAL_QUEUE,false,false,false,arguments); //正常交换机不正常需要将死信转发给死信队列//声明死信队列channel1.queueDeclare(DEAD_QUEUE,false,false,false,null);//绑定普通的交换机与队列channel1.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,zhangsan);//绑定死信的交换机与死信的队列channel1.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,lisi);System.out.println(等待接收消息.....);DeliverCallback deliverCallback (consumerTag,message)-{System.out.println(Consumer01接收的消息是 new String(message.getBody(),UTF-8));};channel1.basicConsume(NORMAL_QUEUE,true,deliverCallback,consumerTag-{});} } 消费者2 public class Consumer02 {//死信队列的名称public static final String DEAD_QUEUE dead_queue;public static void main(String[] args) throws Exception {Channel channel1 RabbitMqUtils.getChannel();System.out.println(等待接收消息.....);DeliverCallback deliverCallback (consumerTag,message)-{System.out.println(Consumer02接收的消息是 new String(message.getBody(),UTF-8));};channel1.basicConsume(DEAD_QUEUE,true,deliverCallback,consumerTag-{});} } 生产者 public class Producer {public static final String NORMAL_EXCHANGE normal_exchange;public static void main(String[] args) throws Exception {Channel channel RabbitMqUtils.getChannel();//死信消息设置TTL时间time to liveAMQP.BasicProperties properties new AMQP.BasicProperties().builder().expiration(10000).build();for (int i 1; i 11; i) {String message infoi;channel.basicPublish(NORMAL_EXCHANGE,zhangsan,properties,message.getBytes());}} }
http://www.sadfv.cn/news/156308/

相关文章:

  • 营销型网站.google搜索排名优化
  • 宁夏自治区住房与城乡建设厅网站求职网站开发开题报告
  • 网站建设制作 企业站开发哪家好建设通是什么
  • 网页设计网站的设计与规划久久建筑网会员登陆
  • 功能分类模块类型网站南充市住房和城乡建设厅网站
  • 浙江网站建设设计wordpress+留言本
  • 河南省住房和城乡建设厅查询网站首页2022千锋教育培训收费一览表
  • 黄骅的网站全景网站是怎么做的
  • 如何自己做网站 开直播做培训的网站建设
  • 网站制作 广州wordpress php7 报错
  • 北京网站建设 招聘信息武进做网站
  • 网站制作 商务专业做app下载网站
  • 便宜模板网站建设福清可以做宣传的网站
  • 外贸网站 测速注册网站费用
  • 济南做网站的中企网站如何做实名认证
  • 给企业做网站前景wordpress纯代码下载
  • 昆山哪家做网站好融媒体建设网站怎么搞
  • 动画网站制作网络营销的案例有哪些
  • 怎样经营好一个网站网站建设费用摊销年限
  • 企业手机网站设计案例高端礼品定制网站
  • 网站开发与系统开发网站头部固定
  • 做校招的网站有哪些构建网站需要会什么意思
  • 英文网站建设公司 杭州百度号码认证申诉平台
  • 西安互联网网站建设国内老牌的网站制作
  • 可在哪些网站做链接云梦网络 网站模板
  • 做电商要有网站吗设计师培训学校有哪些
  • 建筑类招聘网站有哪些qq推广网
  • 分享设计的网站如何配置iis网站
  • 蓝色经典通用网站模板html源码下载计算机哪个专业工资最高
  • 整套网站建设企业做网站的