广州番禺越秀和樾府,昆明seo博客南网站建设,电商cms系统介绍,便宜的营销型网站建设目录 ★ RPC模型#xff08;远程过程调用通信模型#xff09;▲ 完整过程#xff1a;代码演示总体流程解释#xff1a;ConstantUtil 常量工具类ConnectionUtil RabbitMQ连接工具类Server 服务端Client 客户端测试结果服务端客户端 完整代码ConstantUtil 常量工具类Connecti… 目录 ★ RPC模型远程过程调用通信模型▲ 完整过程代码演示总体流程解释ConstantUtil 常量工具类ConnectionUtil RabbitMQ连接工具类Server 服务端Client 客户端测试结果服务端客户端 完整代码ConstantUtil 常量工具类ConnectionUtil RabbitMQ连接工具类Server 服务端Client 客户端pom.xml ★ RPC模型远程过程调用通信模型
PRC 模型相当于一对一跟调用方法一样能拿到方法返回的结果这就是典型的RPC模型不仅要传参数还需要拿到返回值
reply回答、答复 correlation 关联、相互关联
▲ 通过使用两个独享队列可以让RabbitMQ实现RPC远程过程调用通信模型
其通信过程其实也很简单客户端向服务器消费的独享队列发送一条消息服务器收到该消息后对该消息进行处理然后将处理结果发送给客户端消费的独享队列。
▲ 服务器端消费的独享队列负责保存调用参数客户端消费的独享队列负责保存调用的返回值。
▲ 使用独享队列可以避免其他连接来读取队列的消息、只有当前连接才能读取该队列的消息这样才能保证服务器能读到客户端发送的每条消息客户端也能读到服务器返回的每条消息。
▲ 为了让服务器知道客户端所消费的独享队列客户端发送消息时应该将自己监听的队列名以 reply_to属性 发送给服务器
▲为了能准确识别服务器应答消息返回值与客户端请求消息调用参数之间的对应关系 还需要为每条消息都增加一个 correlation_id 属性两条具有相同 correlation_id 属性值的消息可认为是配对的两条消息。
【备注】客户端送出的消息要包含2个属性 reply_to该属性指定了服务器要将返回的消息送回到哪个队列。 correlation_id该属性指定了服务器返回的消息也要添加相同的correlation_id属性。 ▲ 完整过程
1服务器启动时它会创建一个名为“rpc_queue”的独享队列名称可以随意并使用服务器端的消费者监听该独享队列的消息。所有的RPC 调用一定都是先从服务器端的启动开始的。2客户端启动时它会创建一个匿名默认由RabbitMQ命名的 独享队列并使用客户端的消费者监听该独享队列的消息。这个独享队列的名字也是 reply_to 属性的属性值3客户端发送带有两个属性的消息一个是代表应答队列名的 reply_to属性该属性值就是第2步客户端所创建的独享队列名另一个是代表消息标识的 correlation_id 属性。4将消息发送到服务器监听的rpc_queue队列中。5服务器从rpc_queue队列中读取消息服务器调用处理程序对该消息进行计算将计算结果以消息发送给 reply_to属性 指定的队列并为消息添加相同的 correlation_id属性。6客户端从 reply_to 对应的队列中读取消息当消息出现时它会检查消息的 correlation_id属性。如果此属性的值与请求消息的 correlation_id 属性值匹配将它返回给应用。————上面过程其实就是对P2P模型的应用因此无需使用自己的Exchange而是使用系统自动创建的默认Exchange即可。代码演示
需求客户端发送个消息到服务端服务端处理完再返回结果给客户端。 如图需要有两个消息队列一个是服务端
总体流程解释
仅作为自己梳理代码流程的记录大佬请直接忽略
Server 类是服务端 Client 是客户端。
rpc_queue 是自己在服务端声明创建的消息队列服务端监听着这个消息队列
amq.gen-3Nl6GNjR5BzPJ4N-By4p1g 是客户端声明创建的一个默认生成的消息队列。 就是调用 Channel 的 queueDeclare() 方法声明队列时不指定具体的消息队列的参数全凭默认生成客户端监听着这个默认的消息队列。
replyTo 的值是 amq.gen-3xxx 这个默认消息队列作用是指定了服务器要将返回的消息送回到这个默认队列
correlationId 只是一个单纯的消息标识可以给个1、2、3、4…作为消息标识
上面这些就是涉及到的一些点下面就是流程
首先客户端会发送几个消息Exchange时默认的路由key 是 rpc_queue 每个消息都携带者 replyTo 和 correlationId 这两个属性值 解释如果消息发布者指定默认的Exchange那么Exchange就会根据消息发布者发来的消息中携带的路由key假如路由key叫 aaa 去找是否有同样名字叫aaa的消息队列有的话就把消息分发给消息队列没有的话该消息就会被丢弃
这些消息会被默认的Exchange分发给 rpc_queue 这个消息队列。
服务端在声明 rpc_queue 这个消息队列的时候把这个消息队列设置为独享类型exclusivetrue那么 rpc_queue 这个消息队列里面的消息就只能被这个服务端消费不能被其他消费者获取到消息。
因为服务端监听这个 rpc_queue 这个消息队列所以服务端拿到这个消息队列的消息之后就会把消息中的 replyTo 和 correlationId 先拿出来然后同时对消息进行业务逻辑处理。
业务逻辑处理完消息后服务端需要把这些处理后的消息返回给客户端。 重点就是每个消息在客户端发来之后都有一个 correlationId 标识所以服务端在返回回去时需要把处理好的消息的原本的correlationId 标识对应的设置回去。
比如客户端发来消息 A , A 携带的 correlationId 为 1 那么服务端在处理完 A 消息后需要把 correlationId 1 再设置回 这个消息 A 就是拿出来处理完消息再放回去这样客户端在接收服务端返回来的处理过后的A消息时才能根据 correlationId 1 这个标识得到想要的被处理过的A消息数据。 因为客户端发的消息可能有成千上万条需要有这个 correlationId 作为消息的标识才能准确拿到被处理过的想要的那条A消息
服务端返回处理过的消息给客户端也是一个发送消息的过程所以发送消息指定的消息队列就是这个 replyTo就是amq.gen-3xxx 这个默认消息队列这个replyTo 也是从客户端发送来的消息中获取获取一个属性作用是指定了服务端要将返回的消息送回到这个默认队列。
服务端把处理后的消息返回到 amq.gen-3xxx 这个默认消息队列因为 客户端就是在监听amq.gen-3xxx 这个默认消息队列所以客户端就能得到自己一开始发送给服务端然后服务端处理完成后返回来的消息。
然后客户端就能根据 correlationId 这个标识准确找到每个被处理修改过后的消息而不至于找混。再根据需求去对处理过的消息进行业务操作。
以上仅作为自己梳理代码流程的记录大佬请直接忽略 更简单点来说就是
客户端声明并监听着默认队列 amq.gen-3xxx然后发送消息到客户端消息携带有correlationId 和 replyto 两个属性路由key是rpc_queueexchange是默认的。
服务端声明并监听 rpc_queue 消息队列从该队列得到消息messageA后从每个消息中获取该消息对应的 correlationId 和 replyto 两个属性的属性值然后处理消息对于处理完的消息resultMessageA需要把correlationId 和 replyto 两个属性的属性值重新设置回给resultMessageA在通过Exchange分发回给 replyto 属性值中指定的消息队列amq.gen-3xxx。
然后客户端再从 amq.gen-3xxx 默认的消息队列中获取服务端处理并返回回来的消息进行对应的消费。
ConstantUtil 常量工具类 ConnectionUtil RabbitMQ连接工具类 Server 服务端 Client 客户端 测试结果
启动测试的时候一定要先启动服务端再启动客户端
服务端 客户端 QUEUE
完整代码
ConstantUtil 常量工具类
package cn.ljh.rabbitmq.util;//常量
public class ConstantUtil
{//消息队列实现 RPC远程过程调用模型 之 服务器端----------------//消息队列public final static String RPC_QUEUE rpc_queue;// ------------topic类型的Exchange需要的相关常量----------public final static String QUEUET01 qt_01;public final static String QUEUET02 qt_02;// topic 通配符类型的 Exchangepublic static final String EXCHANGE_NAME_TOPIC myex03.topic;// Exchange 绑定 Queue 队列的路由key 通配符类型 *匹配一个单词。#匹配零个或多个单词。public static final String[] ROUTING_TOPIC_PATTERNS {*.crazyit.*, *.org, edu.#};// 生产者发送消息给Excahnge携带的路由keypublic static final String[] ROUTING_TOPIC_KEYS { www.crazyit.org, www.crazyit.cn,edu.crazyit.org, crazyit.org, fkjava.org, edu.fkjava.org, edu.fkjava, edu.org};//-------------------------------------------------------// 消息队列的名称public final static String QUEUE01 queue_01;public final static String QUEUE02 queue_02;// Exchange的名称public static final String EXCHANGE_NAME myex02.direct;// 三个路由key定义成一个数组的名称public static final String[] ROUTING_KEYS {info, error, warning};}
ConnectionUtil RabbitMQ连接工具类
package cn.ljh.rabbitmq.util;import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;//连接工具
public class ConnectionUtil
{//获取连接的方法public static Connection getConnection() throws IOException, TimeoutException{//创建连接工厂----这个ConnectionFactory源码可以看出有构造器所以直接new一个出来ConnectionFactory connectionFactory new ConnectionFactory();//设置连接信息connectionFactory.setHost(localhost);connectionFactory.setPort(5672);connectionFactory.setUsername(ljh);connectionFactory.setPassword(123456);connectionFactory.setVirtualHost(/); //连接虚拟主机//从连接工厂获取连接Connection connection connectionFactory.newConnection();//返回连接return connection;}
}
Server 服务端
package cn.ljh.rabbitmq.producer;import cn.ljh.rabbitmq.util.ConnectionUtil;
import cn.ljh.rabbitmq.util.ConstantUtil;
import com.rabbitmq.client.*;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Random;
import java.util.concurrent.TimeoutException;//消息队列实现 RPC远程过程调用模型 之 服务器端
public class Server
{public static void main(String[] args) throws IOException, TimeoutException{//1、创建连接Connection conn ConnectionUtil.getConnection();//2、通过Connection获取Channel。Channel channel conn.createChannel();//3、使用系统自动创建的默认Exchange无需声明Exchange//消息队列设置为独占exclusive:true--------------------------------------------------------------------------------------声明消息队列channel.queueDeclare(ConstantUtil.RPC_QUEUE,true, /* 是否持久化 */true, /* 是否只允许只有这个消息队列的消息消费者才可以消费这个消息队列的消息 */false, /* 是否自动删除 */null); /* 指定这个消息队列的额外参数属性 *///不需要关闭资源因为它也要监听自己消费消息的队列//4、调用Channel 的 basicConsume()方法开始消费消息----------------------------------------------------------------------------1、服务端监听并消费消息channel.basicConsume(ConstantUtil.RPC_QUEUE, /* 消费这个名字的消费队列里面的消息 */true,new DefaultConsumer(channel){//处理消息当这个 ConstantUtil.RPC_QUEUE 消息队列收到消息的时候这个方法就会被触发。重写这个方法Overridepublic void handleDelivery(String consumerTag,Envelope envelope /*消息所在的信封,存放消息的exchange、路由key这些*/,AMQP.BasicProperties properties /* 消息的那些属性 */,byte[] body /*body消息的消息体*/) throws IOException{//把消息体中的消息拿出来此处读取到的消息就相当于调用参数-------------------------------------------------------2、服务端获取消息队列的消息String param new String(body, StandardCharsets.UTF_8);//之前只需要用到消息现在需要额外读取消息里面携带的两个属性reply_to 和 correlation_id//消息的属性都存放在 AMQP.BasicProperties 这个属性里面从这个属性获取 reply_to 和 correlation_idString replyTo properties.getReplyTo();System.err.println(replyTo: replyTo);String correlationId properties.getCorrelationId();System.err.println(correlationId: correlationId);//调用服务器的处理消息的方法最终得到处理后的结果。该方法可以是任意的业务处理,该方法的返回值result是要被送回客户端的。------3、服务端处理消费消息String result format(param);//printf格式化输出函数 %s输出字符串 %n换行System.err.printf(服务端 收到来自Exchange为【%s】、路由key为【%s】的消息消息内容为%s%n,envelope.getExchange(), envelope.getRoutingKey(), param);//发送消息的方法需要把返回值result发送回客户端-------------------------------------------------------4、服务端处理消费完的消息返回客户端的操作channel.basicPublish(, /* 使用默认的Exchange */replyTo,/* 此处的routing key 应该填 reply_to 属性; reply_to: 该属性指定了服务器要将返回的消息送回到哪个队列 *///把从客户端的 AMQP.BasicProperties 属性获取到的correlationId再作为参数传回去用于客户端和服务器的匹配。new AMQP.BasicProperties().builder().correlationId(correlationId) /* 也需要返回额外的 correlation_id要与从客户端消息中读取的 correlation_id 完全一样 */.deliveryMode(2) /* 设置这个消息是持久化类型的 */.build(), /*这个.build()的作用就是构建得到这个 BasicProperties 对象这个对象就包含了 correlationId 属性因为服务器端返回的消息一定要有这个correlationId。 */result.getBytes(StandardCharsets.UTF_8));}});}//模拟服务器端消费消息要做的处理业务逻辑操作public static String format(String name){//此处模拟让服务器处理这里的业务有快有慢的情况看correlation_id 能不能还是把数据对应上int rand (new Random().nextInt(40) 20) * 30;try{Thread.sleep(rand);} catch (InterruptedException e){e.printStackTrace();}return 《 name 》;}}
Client 客户端
package cn.ljh.rabbitmq.consumer;import cn.ljh.rabbitmq.util.ConnectionUtil;
import cn.ljh.rabbitmq.util.ConstantUtil;
import com.rabbitmq.client.*;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;//消息队列实现 RPC远程过程调用模型 之 客户端
public class Client
{// paramMap 保存了 correlationid 与 参数消息之间的对应关系public static MapString, String paramMap new ConcurrentHashMap();//客户端发送的消息参数public static String[] params new String[]{火影忍者, 七龙珠, 哆啦A梦, 蜡笔小新};public static void main(String[] args) throws IOException, TimeoutException{//1、创建连接工厂设置连接信息然后再通过连接工厂获取连接Connection conn ConnectionUtil.getConnection();//2、通过Connection获取Channel 消息通道Channel channel conn.createChannel();//3、调用 Channel 的 queueDeclare() 方法声明队列声明一个有 RabbitMQ 自动创建的、自动命名的、持久化的、独享的、会自动删除的【默认队列】AMQP.Queue.DeclareOk declareOk channel.queueDeclare();System.out.println(declareOk declareOk);//.getQueue() 用于得到默认队列的返回值也就是默认队列的名字之前声明是我们自己设置队列名这里用默认的队列就用.getQueue() 得到队列名。String queueName declareOk.getQueue();System.out.println(queueName: queueName);//4、调用Channel 的 basicConsume()方法开始处理消费消息-----------------------------------------------------------------2、客户端监听服务端处理完消息后返回来的消息channel.basicConsume(queueName /*消费这个消费队列里面的消息*/,true /*消息的确认模式是否自动确认该消息已经被消费完成并返回确认消息给消息队列*/,new DefaultConsumer(channel){//处理消息当这个消息队列收到消息的时候这个方法就会被触发。重写这个方法Overridepublic void handleDelivery(String consumerTag,Envelope envelope /*消息所在的信封,存放消息的exchange、路由key这些*/,AMQP.BasicProperties properties /*消息的那些属性*/,byte[] body /*body消息的消息体*/) throws IOException{//把消息体中的消息拿出来String resultMessage new String(body, StandardCharsets.UTF_8);//此处需要指定每个返回值对应的是哪个参数靠的就是correlation_idString correlationId properties.getCorrelationId();//根据服务器端返回的消息中的correlation_id 获取对应的参数String param paramMap.get(correlationId);System.err.println(客户端发出去的消息内容param 服务端处理后返回来的消息内容resultMessage);//printf格式化输出函数 %s输出字符串 %n换行System.out.printf(客户端 收到来自Exchange为【%s】、路由key为【%s】的消息消息内容为%s%n,envelope.getExchange(), envelope.getRoutingKey(), resultMessage);//得到服务器的返回值之后整个调用过程就完成了此时就应该从 Map 中删除这组 key-value对了( correlationId 与 参数的对应关系 )。paramMap.remove(correlationId);}});//客户端发送消息---------------------------------------------------------------------------代码运行后先执行这段--------------------1、客户端发送消息for (int i 0 ; i params.length ; i){paramMap.put( i , params[i] );channel.basicPublish(, /* 使用默认的Exchange */ConstantUtil.RPC_QUEUE, /* 客户端发送消息携带的路由key是服务端监听的消息队列的名字且使用了默认的Exchange这就意味着消息会被发送给服务器监听的那个消息队列 */new AMQP.BasicProperties().builder().correlationId(i ) /* 设置 correlation_id 属性; correlation_id该属性指定了服务器返回的消息也要添加相同的correlation_id属性*/.replyTo(queueName) /* reply_to: 该属性指定了服务器要将返回的消息送回到哪个队列 , 设置 reply_to 属性 */.deliveryMode(2) /* 持久化消息 */.build(), /* 构建这个BasicProperties对象这个对象主要存这个correlationId属性 */params[i].getBytes(StandardCharsets.UTF_8));}}
}
pom.xml
?xml version1.0 encodingUTF-8?
project xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersiongroupIdcn.ljh/groupIdartifactIdrabbitmq_rpc/artifactIdversion1.0.0/versionnamerabbitmq_rpc/name!-- 属性 --propertiesmaven.compiler.source11/maven.compiler.sourcemaven.compiler.target11/maven.compiler.targetproject.build.sourceEncodingUTF-8/project.build.sourceEncodingjava.version11/java.version/properties!-- 依赖 --dependencies!-- RabbitMQ 的依赖库 --dependencygroupIdcom.rabbitmq/groupIdartifactIdamqp-client/artifactIdversion5.13.0/version/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactIdversion1.18.16/versionscopecompile/scope/dependency/dependencies/project