当前位置: 首页 > news >正文

电商网站 app图片转链接生成器在线制作

电商网站 app,图片转链接生成器在线制作,千万不要报培训班学室内设计,徐州网站的优化RocketMQ的底层通信模块remoting remoting是RocketMQ的底层通信模块#xff0c;RocketMQ底层通讯是使用Netty来实现的。本文通过对remoting源码进行分析#xff0c;来说明remoting如何实现高性能通信的。 二、Remoting 通信模块结构 remoting 的网络通信是基于 Netty 实现RocketMQ底层通讯是使用Netty来实现的。本文通过对remoting源码进行分析来说明remoting如何实现高性能通信的。 二、Remoting 通信模块结构 remoting 的网络通信是基于 Netty 实现模块中类的继承关系如下 可见通信的类继承自类RemotingService,RemotingService的定义如下 RemotingService是RPC 远程服务基础类。主要定义所有的远程服务类的基础方法 public interface RemotingService {// 服务启动void start();// 服务停止void shutdown();//注册RPC钩子函数void registerRPCHook(RPCHook rpcHook); }抽象出服务端与客户端都需要的三个方法其中注册hook是为了能够在远程通讯之后或调用之前执行用户自定义的逻辑。 RemotingServer/RemotingClient 远程服务器/客户端基础接口两者中的方法基本类似故这里重点介绍一下 RemotingServer定位 RPC 远程操作的相关“业务方法”。 public interface RemotingServer extends RemotingService {/*** 注册处理器* param requestCode 请求码* param processor 处理器* param executor 线程池* 这三者是绑定关系* 根据请求的code 找到处理对应请求的处理器与线程池 并完成业务处理。*/void registerProcessor(final int requestCode, final NettyRequestProcessor processor,final ExecutorService executor);/*** 注册缺省处理器* param processor 缺省处理器* param executor 线程池*/void registerDefaultProcessor(final NettyRequestProcessor processor, final ExecutorService executor);// 获取服务端口int localListenPort();/*** 根据 请求码 获取 处理器和线程池* param requestCode 请求码* return*/PairNettyRequestProcessor, ExecutorService getProcessorPair(final int requestCode);/*** 同步调用* param channel 通信通道* param request 业务请求对象* param timeoutMillis 超时时间* return 响应结果封装*/RemotingCommand invokeSync(final Channel channel, final RemotingCommand request,final long timeoutMillis) throws InterruptedException, RemotingSendRequestException,RemotingTimeoutException;/*** 异步调用* param channel 通信通道* param request 业务请求对象* param timeoutMillis 超时时间* param invokeCallback 响应结果回调对象*/void invokeAsync(final Channel channel, final RemotingCommand request, final long timeoutMillis,final InvokeCallback invokeCallback) throws InterruptedException,RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;/*** 单向调用 (不关注返回结果)* param channel 通信通道* param request 业务请求对象* param timeoutMillis 超时时间*/void invokeOneway(final Channel channel, final RemotingCommand request, final long timeoutMillis)throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException,RemotingSendRequestException; } 从上面的代码可以看出RemotingServer的主要功能是注册请求协议处理器、请求调用方法。 NettyRemotingServer服务端的实现类实现了 RemotingServer 接口继承 NettyRemotingAbstract 抽象类 public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {private static final InternalLogger log InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);// Netty服务端启动器private final ServerBootstrap serverBootstrap;// worker组private final EventLoopGroup eventLoopGroupSelector;// boss组 private final EventLoopGroup eventLoopGroupBoss;// Netty服务端配置信息类private final NettyServerConfig nettyServerConfig;// 公共线程池 (在注册协议处理器的时候若未给处理器指定线程池那么就是用该公共线程池)private final ExecutorService publicExecutor;// Netty Channel 特殊状态监听器private final ChannelEventListener channelEventListener;// 定时器 (功能 扫描 responseTable表将过期的responseFuture移除)private final Timer timer new Timer(ServerHouseKeepingService, true);// 用于在pipeline指定handler中 执行任务的线程池private DefaultEventExecutorGroup defaultEventExecutorGroup;// 服务端绑定的端口private int port 0;private static final String HANDSHAKE_HANDLER_NAME handshakeHandler;private static final String TLS_HANDLER_NAME sslHandler;private static final String FILE_REGION_ENCODER_NAME fileRegionEncoder;// 用于处理 SSL 握手连接的处理器private HandshakeHandler handshakeHandler;// 协议编码 处理器private NettyEncoder encoder;// 连接管理 处理器private NettyConnectManageHandler connectionManageHandler;// 核心业务 处理器private NettyServerHandler serverHandler;// 参数1 nettyServerConfig Netty服务端配置信息// 参数2 channelEventListener channel特殊状态监听器public NettyRemotingServer(final NettyServerConfig nettyServerConfig,final ChannelEventListener channelEventListener) {// 调用父类 就是通过 Semaphore 设置请求并发限制// 1. 设置 单行请求的并发限制// 2. 设置 异步请求的并发限制super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());this.serverBootstrap new ServerBootstrap();this.nettyServerConfig nettyServerConfig;this.channelEventListener channelEventListener;// 创建公共线程池 publicExecutor 线程数量为4int publicThreadNums nettyServerConfig.getServerCallbackExecutorThreads();if (publicThreadNums 0) {publicThreadNums 4;}this.publicExecutor Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {private AtomicInteger threadIndex new AtomicInteger(0);Overridepublic Thread newThread(Runnable r) {return new Thread(r, NettyServerPublicExecutor_ this.threadIndex.incrementAndGet());}});// 下面就是根据操作系统平台来选择创建 bossGroup 和 workGroup的逻辑if (useEpoll()) {this.eventLoopGroupBoss new EpollEventLoopGroup(1, new ThreadFactory() {private AtomicInteger threadIndex new AtomicInteger(0);Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format(NettyEPOLLBoss_%d, this.threadIndex.incrementAndGet()));}});this.eventLoopGroupSelector new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {private AtomicInteger threadIndex new AtomicInteger(0);private int threadTotal nettyServerConfig.getServerSelectorThreads();Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format(NettyServerEPOLLSelector_%d_%d, threadTotal, this.threadIndex.incrementAndGet()));}});} else {this.eventLoopGroupBoss new NioEventLoopGroup(1, new ThreadFactory() {private AtomicInteger threadIndex new AtomicInteger(0);Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format(NettyNIOBoss_%d, this.threadIndex.incrementAndGet()));}});this.eventLoopGroupSelector new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {private AtomicInteger threadIndex new AtomicInteger(0);private int threadTotal nettyServerConfig.getServerSelectorThreads();Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format(NettyServerNIOSelector_%d_%d, threadTotal, this.threadIndex.incrementAndGet()));}});}// 加载SSL连接的相关方法 (不在本篇的分析范围内)loadSslContext();} } NettyRemotingServer当中重要的参数 父类的属性 semaphoreOneway **semaphoreAsync ** 用来控制请求并发量的 serverBootstrap Netty服务器启动器 nettyServerConfig Netty服务器配置信息 channelEventListener Netty Channel状态监听器 eventLoopGroupSelector worker组 eventLoopGroupBoss boss组 NettyRemotingServer的启动 // 启动Netty 服务器Overridepublic void start() {// Netty pipeline中的指定 handler 采用该线程池执行this.defaultEventExecutorGroup new DefaultEventExecutorGroup(nettyServerConfig.getServerWorkerThreads(),new ThreadFactory() {private AtomicInteger threadIndex new AtomicInteger(0);Overridepublic Thread newThread(Runnable r) {return new Thread(r, NettyServerCodecThread_ this.threadIndex.incrementAndGet());}});// 初始化 处理器 handler// 1. handshakeHandler SSL连接// 2. encoder 编码器// 3. connectionManageHandler 连接管理器处理器// 4. serverHandler 核心业务处理器prepareSharableHandlers();// 下面就是 Netty 创建服务端启动器的固定流程 ServerBootstrap childHandler // 配置服务端 启动对象// 配置工作组 boss 和 worker 组this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)// 设置服务端ServerSocketChannel 类型.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)// 设置服务端ch选项.option(ChannelOption.SO_BACKLOG, 1024).option(ChannelOption.SO_REUSEADDR, true).option(ChannelOption.SO_KEEPALIVE, false)// 设置客户端ch选项.childOption(ChannelOption.TCP_NODELAY, true)// 设置 接收缓冲区 和 发送缓冲区的 大小.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize()).childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())// 设置服务器端口.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort())).childHandler(new ChannelInitializerSocketChannel() {Overridepublic void initChannel(SocketChannel ch) throws Exception {// 初始化 客户端ch pipeline 的逻辑, 同时指定了线程池为 defaultEventExecutorGroupch.pipeline().addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler).addLast(defaultEventExecutorGroup,encoder,new NettyDecoder(),new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),connectionManageHandler,serverHandler);}});if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {// 客户端开启 内存池使用的内存池 是 PooledByteBufAllocator.DEFAULTchildHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);}try {// 服务器 绑定端口ChannelFuture sync this.serverBootstrap.bind().sync();InetSocketAddress addr (InetSocketAddress) sync.channel().localAddress();this.port addr.getPort();} catch (InterruptedException e1) {throw new RuntimeException(this.serverBootstrap.bind().sync() InterruptedException, e1);}// 条件成立 channel状态监听器不为空 则创建 网络异常事件执行器if (this.channelEventListener ! null) {this.nettyEventExecutor.start();}// 提交定时任务每一秒 执行一次// 扫描 responseTable 表 将过期的 responseFuture 移除this.timer.scheduleAtFixedRate(new TimerTask() {Overridepublic void run() {try {NettyRemotingServer.this.scanResponseTable();} catch (Throwable e) {log.error(scanResponseTable exception, e);}}}, 1000 * 3, 1000);} 上述代码 基本上就是 模板Netty创建服务端的代码主要做了如下几件事 启动Netty服务器 开启 channel状态监听线程 开启 扫描 responseFuture 的定时任务 通过这个结构图可以看出RocketMQ 在 Netty 原生的多线程 Reactor 模型上做了一系列的扩展和优化使用多个线程池来处理数据 1、一个 Reactor 主线程eventLoopGroupBoss即为上面的1负责监听 TCP 网络连接请求建立好连接创建 SocketChannel并注册到 selector 上。RocketMQ 的源码中会自动根据 OS 的类型选择 NIO 和 Epoll也可以通过参数配置然后监听真正的网络数据。 2、拿到网络数据后再丢给 Worker 线程池eventLoopGroupSelector即为上面的“N”源码中默认设置为3 3、在真正执行业务逻辑之前需要进行 SSL 验证、编解码、空闲检查、网络连接管理这些工作交给 defaultEventExecutorGroup即为上面的“M1”源码中默认设置为 8 去做。 4、而处理业务操作放在业务线程池中执行根据 RomotingCommand 的业务请求码 code 去 processorTable 这个本地缓存变量中找到对应的 processor然后封装成 task 任务后提交给对应的业务 processor 处理线程池来执行sendMessageExecutor以发送消息为例即为上面的 “M2”。从入口到业务逻辑的几个步骤中线程池一直再增加这跟每一步逻辑复杂性相关越复杂需要的并发通道越宽。NettyRemotingAbstract抽象类NettyRemotingAbstract是NettyRemotingServer的父类主要定义了请求并发量、控制响应对象和各种请求处理函数。 public abstract class NettyRemotingAbstract {// 控制 单向请求的 并发量protected final Semaphore semaphoreOneway;// 控制 异步请求的 并发量protected final Semaphore semaphoreAsync;// 响应对象映射表 (key: opaque value:responseFuture)protected final ConcurrentMapInteger /* opaque */, ResponseFuture responseTable new ConcurrentHashMapInteger, ResponseFuture(256);// 请求处理器映射表 (key: requestCode value:(processor,executor) )protected final HashMapInteger/* request code */, PairNettyRequestProcessor, ExecutorService processorTable new HashMapInteger, PairNettyRequestProcessor, ExecutorService(64);// Netty事件监听线程池protected final NettyEventExecutor nettyEventExecutor new NettyEventExecutor();// 默认的请求处理器对 包含(processor,executor) protected PairNettyRequestProcessor, ExecutorService defaultRequestProcessor;// SSL相关protected volatile SslContext sslContext;// 扩展钩子protected ListRPCHook rpcHooks new ArrayListRPCHook(); }结合RocketMQ源码分析 01初始化流程 在Broker创建的时候会去初始化NettyRemotingServer类调用其构造方法。 public NettyRemotingServer(final NettyServerConfig nettyServerConfig,final ChannelEventListener channelEventListener) {super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());this.serverBootstrap new ServerBootstrap();this.nettyServerConfig nettyServerConfig;this.channelEventListener channelEventListener;int publicThreadNums nettyServerConfig.getServerCallbackExecutorThreads();if (publicThreadNums 0) {publicThreadNums 4;}// 初始化用于接收客户端的TCP连接this.publicExecutor Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {private AtomicInteger threadIndex new AtomicInteger(0);Overridepublic Thread newThread(Runnable r) {return new Thread(r, NettyServerPublicExecutor_ this.threadIndex.incrementAndGet());}});// 根据配置设置NIO还是Epoll来作为Selector线程池// 默认在Linux环境为true,windows环境下都是NIO相对来说Linux更快。if (useEpoll()) {this.eventLoopGroupBoss new EpollEventLoopGroup(1, new ThreadFactory() {private AtomicInteger threadIndex new AtomicInteger(0);Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format(NettyEPOLLBoss_%d, this.threadIndex.incrementAndGet()));}});this.eventLoopGroupSelector new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {private AtomicInteger threadIndex new AtomicInteger(0);private int threadTotal nettyServerConfig.getServerSelectorThreads();Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format(NettyServerEPOLLSelector_%d_%d, threadTotal, this.threadIndex.incrementAndGet()));}});} else {this.eventLoopGroupBoss new NioEventLoopGroup(1, new ThreadFactory() {private AtomicInteger threadIndex new AtomicInteger(0);Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format(NettyNIOBoss_%d, this.threadIndex.incrementAndGet()));}});this.eventLoopGroupSelector new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {private AtomicInteger threadIndex new AtomicInteger(0);private int threadTotal nettyServerConfig.getServerSelectorThreads();Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format(NettyServerNIOSelector_%d_%d, threadTotal, this.threadIndex.incrementAndGet()));}});}// 加载SSLloadSslContext(); } 首先会初始化ServerBootstrapNettyServerConfig等相关参数同时会判断是在哪个操作系统如果是在Linux平台则会使用EpollEventLoopGroup作为线程池如果不是则会使用NioEventLoopGroup作为线程池。 02启动流程 在Broker作为服务端启动与NameServer作为服务端启动的时候都会来初始化启动一个Netty的服务端进行相关的通讯。 Override public void start() {// 默认的处理线程池组用于处理后面多个NettyHandle的操作this.defaultEventExecutorGroup new DefaultEventExecutorGroup(nettyServerConfig.getServerWorkerThreads(),new ThreadFactory() {private AtomicInteger threadIndex new AtomicInteger(0);Overridepublic Thread newThread(Runnable r) {return new Thread(r, NettyServerCodecThread_ this.threadIndex.incrementAndGet());}});prepareSharableHandlers();// 正常的一种Netty的服务端的启动逻辑// 其中包括解码器编码器心跳管理器连接管理器消息类型处理分发ServerBootstrap childHandler this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector).channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, nettyServerConfig.getServerSocketBacklog()).option(ChannelOption.SO_REUSEADDR, true).option(ChannelOption.SO_KEEPALIVE, false).childOption(ChannelOption.TCP_NODELAY, true).localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort())).childHandler(new ChannelInitializerSocketChannel() {Overridepublic void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler).addLast(defaultEventExecutorGroup,encoder,new NettyDecoder(),new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),connectionManageHandler,serverHandler);}});if (nettyServerConfig.getServerSocketSndBufSize() 0) {log.info(server set SO_SNDBUF to {}, nettyServerConfig.getServerSocketSndBufSize());childHandler.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize());}if (nettyServerConfig.getServerSocketRcvBufSize() 0) {log.info(server set SO_RCVBUF to {}, nettyServerConfig.getServerSocketRcvBufSize());childHandler.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize());}if (nettyServerConfig.getWriteBufferLowWaterMark() 0 nettyServerConfig.getWriteBufferHighWaterMark() 0) {log.info(server set netty WRITE_BUFFER_WATER_MARK to {},{},nettyServerConfig.getWriteBufferLowWaterMark(), nettyServerConfig.getWriteBufferHighWaterMark());childHandler.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(nettyServerConfig.getWriteBufferLowWaterMark(), nettyServerConfig.getWriteBufferHighWaterMark()));}if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);}try {ChannelFuture sync this.serverBootstrap.bind().sync();InetSocketAddress addr (InetSocketAddress) sync.channel().localAddress();this.port addr.getPort();} catch (InterruptedException e1) {throw new RuntimeException(this.serverBootstrap.bind().sync() InterruptedException, e1);}if (this.channelEventListener ! null) {this.nettyEventExecutor.start();}// 定时扫描responseTable获取返回结果并处理超时业务。this.timer.scheduleAtFixedRate(new TimerTask() {Overridepublic void run() {try {// 超时请求扫描NettyRemotingServer.this.scanResponseTable();} catch (Throwable e) {log.error(scanResponseTable exception, e);}}}, 1000 * 3, 1000); }初始化线程池组处理多个NettyHandle操作。 初始化ServerBootStrap设置tcp参数编解码器心跳处理器连接管理器请求分发。 定时扫描ResponseTable获取返回结果并处理超时业务。 NettyEncoder类 将RemotingCommand对象序列化为ByteBuffer对象。根据serializerType的不同Header会编码为JSON或者二进制。 Override public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out)throws Exception {try {remotingCommand.fastEncodeHeader(out);byte[] body remotingCommand.getBody();if (body ! null) {out.writeBytes(body);}} catch (Exception e) {log.error(encode exception, RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);if (remotingCommand ! null) {log.error(remotingCommand.toString());}RemotingUtil.closeChannel(ctx.channel());} }将RemotingCommand对象编码成ByteBuf进行数据的传递先将Header部分进行编码然后将body追加到ByteBuf中Header的编码相对复杂因为在传递过程中我们得让被传递方知道该ByteBuf如何进行拆解同时我们也要防止各种不正常的情况方法这是就需要我们规定一些内容这些都会在Netty里面去详讲这里就提及一下。第一个字节表示编解码类型 NettyDecoder类 NettyEncoder继承自LengthFieldBasedFrameDecoder主要有用于解码入站数据流并将数据流解码为RemotingCommand对象。 Override public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {ByteBuf frame null;try {frame (ByteBuf) super.decode(ctx, in);if (null frame) {return null;}return RemotingCommand.decode(frame);} catch (Exception e) {log.error(decode exception, RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);RemotingUtil.closeChannel(ctx.channel());} finally {if (null ! frame) {frame.release();}}return null; }首先会调用LengthFieldBasedFrameDecoder.decoder方法进行首次解码然后调用NettyDecoder.decoder进行二次解码完成Header与body的解码并转换成RemotingCommand对象。而之所以需要两次第一个就是为了解出对应的长度即数据流中前4个字节的值表示有效数据域的长度除开前4个字节外的内容都是有效数据域的内容不存在偏移量。 IdleStateHandler类 进行心跳检查类客户端与服务端之间需要保持长连接则需要通过一个心跳机制来保证有效性当其中一者处于宕机或者网络延迟的情况下判断是否有效如果无效及时进行释放资源。该类三个参数第一个为读空闲时间第二个为写空闲时间第三个为读或写空闲时间如果在规定时间内没有触发对应事件就会触发定时任务的执行 NettyConnectManageHandler类 用于监听pipeline中入站/出站的事件主要进行日志记录等操作。 NettyServerHandler类 核心业务处理器处理的时候会去调用channelRead0进行相关逻辑最终会去执行processMessageReceived方法。 public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {final RemotingCommand cmd msg;if (cmd ! null) {switch (cmd.getType()) {// 处理请求的过程case REQUEST_COMMAND:processRequestCommand(ctx, cmd);break;case RESPONSE_COMMAND:// 处理响应的过程processResponseCommand(ctx, cmd);break;default:break;}} }TYPE类型为REQUEST_COMMAND表示请求消息则调用processRequestCommand方法进行处理。 public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {// 从processorTable中找到对应的Processor如果不存在则使用defaultRequestProcessor处理器final PairNettyRequestProcessor, ExecutorService matched this.processorTable.get(cmd.getCode());final PairNettyRequestProcessor, ExecutorService pair null matched ? this.defaultRequestProcessor : matched;final int opaque cmd.getOpaque();if (pair ! null) {Runnable run new Runnable() {Overridepublic void run() {try {// 获取nameServerString remoteAddr RemotingHelper.parseChannelRemoteAddr(ctx.channel());doBeforeRpcHooks(remoteAddr, cmd);final RemotingResponseCallback callback new RemotingResponseCallback() {Override// 服务端处理完请求之后调用public void callback(RemotingCommand response) {doAfterRpcHooks(remoteAddr, cmd, response);// 不是单向if (!cmd.isOnewayRPC()) {if (response ! null) {response.setOpaque(opaque);response.markResponseType();response.setSerializeTypeCurrentRPC(cmd.getSerializeTypeCurrentRPC());try {// 往客户端输出结果ctx.writeAndFlush(response);} catch (Throwable e) {log.error(process request over, but response failed, e);log.error(cmd.toString());log.error(response.toString());}} else {}}}};// 如果是异步处理的Processif (pair.getObject1() instanceof AsyncNettyRequestProcessor) {AsyncNettyRequestProcessor processor (AsyncNettyRequestProcessor)pair.getObject1();processor.asyncProcessRequest(ctx, cmd, callback);} else {// 不是异步知道拿到Processor进行相关处理NettyRequestProcessor processor pair.getObject1();RemotingCommand response processor.processRequest(ctx, cmd);callback.callback(response);}} catch (Throwable e) {log.error(process request exception, e);log.error(cmd.toString());if (!cmd.isOnewayRPC()) {final RemotingCommand response RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,RemotingHelper.exceptionSimpleDesc(e));response.setOpaque(opaque);ctx.writeAndFlush(response);}}}};if (pair.getObject1().rejectRequest()) {final RemotingCommand response RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,[REJECTREQUEST]system busy, start flow control for a while);response.setOpaque(opaque);ctx.writeAndFlush(response);return;}try {final RequestTask requestTask new RequestTask(run, ctx.channel(), cmd);pair.getObject2().submit(requestTask);} catch (RejectedExecutionException e) {if ((System.currentTimeMillis() % 10000) 0) {log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) , too many requests and system thread pool busy, RejectedExecutionException pair.getObject2().toString() request code: cmd.getCode());}if (!cmd.isOnewayRPC()) {final RemotingCommand response RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,[OVERLOAD]system busy, start flow control for a while);response.setOpaque(opaque);ctx.writeAndFlush(response);}}} else {String error request type cmd.getCode() not supported;final RemotingCommand response RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);response.setOpaque(opaque);ctx.writeAndFlush(response);log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) error);} }首先从processorTable获取对应的Processor而processorTable的内容都是通过启动Broker或者是NameServer进行存入的存储的结构为map以Code作为key以Processor作为值当我们使用的时候只需要根据code查询即可如果没有找到就直接使用默认的defaultRequestProcessor处理器。 如果获取到的Processor为异步的则调用异步处理请求如果不是则直接调用Processor对应的processRequest方法进行处理。 TYPE类型为RESPONSE_COMMAND表示响应消息则调用processResponseCommand进行处理。 public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {final int opaque cmd.getOpaque();// 从map缓存获取正在进行的其中一个请求final ResponseFuture responseFuture responseTable.get(opaque);if (responseFuture ! null) {responseFuture.setResponseCommand(cmd);// 移除本次请求responseTable.remove(opaque);// 执行回调if (responseFuture.getInvokeCallback() ! null) {executeInvokeCallback(responseFuture);} else {responseFuture.putResponse(cmd);responseFuture.release();}} else {log.warn(receive response, but not matched any request, RemotingHelper.parseChannelRemoteAddr(ctx.channel()));log.warn(cmd.toString());} }超时请求扫描 在每次请求时将ResponseFuture放入Map中通过定时扫描来判断记录时间与超时时间来比较判断是否超时。 public void scanResponseTable() {final ListResponseFuture rfList new LinkedListResponseFuture();IteratorEntryInteger, ResponseFuture it this.responseTable.entrySet().iterator();while (it.hasNext()) {EntryInteger, ResponseFuture next it.next();ResponseFuture rep next.getValue();// 当前时间大于请求开始时间超时等待时间1秒则任务超时了if ((rep.getBeginTimestamp() rep.getTimeoutMillis() 1000) System.currentTimeMillis()) {rep.release();it.remove();rfList.add(rep);log.warn(remove timeout request, rep);}}// 执行被移除 ResponseFuture对象的executeInvokeCallback方法 for (ResponseFuture rf : rfList) {try {executeInvokeCallback(rf);} catch (Throwable e) {log.warn(scanResponseTable, operationComplete Exception, e);}} }
http://www.sadfv.cn/news/136181/

相关文章:

  • 意大利之家设计网站国产 做 视频网站
  • 天津市南开区网站开发有限公司网站开发技术指标是什么
  • 中小企业网站制作报价想系统学习wordpress
  • 做新浪微博网站需要建筑工程职业学院官网
  • 聊城做网站做的不错的关键词竞价排名
  • 图书馆建设投稿网站wordpress引入html代码
  • 做公司网站方案网站建设文化哪家好
  • 郎溪做网站视频生成链接
  • 网站更换域名 换程序 SEO做网站的所有代码
  • 哈尔滨市哪里做淘宝网站即商通网站建设推广
  • html门户网站开发源代码网站建设端口
  • 密云上海网站建设wordpress 手机站目录
  • 培训网站建设机构上海市网站信息无障碍建设
  • 商家自己做的商品信息查询网站备案网站名称重复
  • 网站开发+协作平台怎么更改网站首页图片尺寸
  • 网站备案政策为了推出企业网站建设
  • 没有网站可以做网络推广吗可信赖的邢台做网站
  • 企业为何做网站中国保险公司排名前十名
  • 网站显示系统建设中如何去掉wordpress底部版权
  • 满山红厦门网站建设如何把网页字体转换为wordpress
  • me域名的网站网站非法收录用户信息
  • 做exo小说的网站南昌做小程序公司
  • 站长查询站长工具3g微网站
  • 个人 网站备案 幕布大连关键词
  • 河北提供网站制作公司电话怎么做同城商务网站
  • 佛山外贸网站建设价位商赢网站建设
  • 沧州市建设服务中心网站网站首页做30个关键词
  • 微信营销成功案例分享绵阳做网站优化
  • 网站虚拟主机免备案问答网站模板下载
  • 响应式网站跟一般网站的区别dedecms 购物网站