外贸网站推广seo,网站后台怎么制作,国家精品课程网官网,推荐一个好点的网站RPC框架-Gitee代码(麻烦点个Starred, 支持一下吧) RPC框架-GitHub代码(麻烦点个Starred, 支持一下吧) Netty业务逻辑 5.Netty业务逻辑a.加入基础的Netty代码b.对通道channel进行缓存c.对代码进行重构优化d.完成基础通信e.异步获取服务器的返回结果f.调整代码g.处理handler (优化…RPC框架-Gitee代码(麻烦点个Starred, 支持一下吧) RPC框架-GitHub代码(麻烦点个Starred, 支持一下吧) Netty业务逻辑 5.Netty业务逻辑a.加入基础的Netty代码b.对通道channel进行缓存c.对代码进行重构优化d.完成基础通信e.异步获取服务器的返回结果f.调整代码g.处理handler (优化) 5.Netty业务逻辑
a.加入基础的Netty代码
1.在DcyRpcBootstrap类的start()方法中加入netty代码 (待完善)
/*** 启动netty服务*/
public void start() {// 1.创建EventLoopGroup老板只负责处理请求之后会将请求分发给worker1比2的比例NioEventLoopGroup boss new NioEventLoopGroup(2);NioEventLoopGroup worker new NioEventLoopGroup(10);try{// 2.服务器端启动辅助对象ServerBootstrap serverBootstrap new ServerBootstrap();// 3.配置服务器serverBootstrap serverBootstrap.group(boss, worker).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializerSocketChannel() {Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {// TODO 核心内容需要添加很多入栈和出栈的handlersocketChannel.pipeline().addLast(null);}});// 4.绑定端口ChannelFuture channelFuture serverBootstrap.bind(port).sync();// 5.阻塞操作channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();} finally {try {boss.shutdownGracefully().sync();worker.shutdownGracefully().sync();} catch (InterruptedException e) {e.printStackTrace();}}
}2.在ReferenceConfig类的get()方法中加入netty代码 (待完善)
/*** 代理设计模式生成一个API接口的代理对象* return 代理对象*/
public T get() {// 使用动态代理完成工作ClassLoader classLoader Thread.currentThread().getContextClassLoader();Class[] classes new Class[]{interfaceRef};// 使用动态代理生成代理对象Object helloProxy Proxy.newProxyInstance(classLoader, classes, new InvocationHandler() {Overridepublic Object invoke(Object proxy, Method method, Object[] args) throws Throwable {// 调用sayHi()方法事实上会走进这个代码段当中// 已经知道method(具体的方法)args(参数列表)log.info(method--{}, method.getName());log.info(args--{}, args);// 1.发现服务从注册中心寻找一个可用的服务// 传入服务的名字返回ip端口 InetSocketAddress可以封装端口/ip/host nameInetSocketAddress address registry.lookup(interfaceRef.getName());if (log.isInfoEnabled()){log.info(服务调用方发现了服务{}的可用主机{}, interfaceRef.getName(), address);}// 2.使用netty连接服务器发送 调用的 服务名字方法名字参数列表得到结果// 定义线程池 EventLoopGroupNioEventLoopGroup group new NioEventLoopGroup();// 启动一个客户端需要一个辅助类 bootstrapBootstrap bootstrap new Bootstrap();try {bootstrap bootstrap.group(group).remoteAddress(address)// 选择初始化一个什么样的channel.channel(NioSocketChannel.class).handler(new ChannelInitializerSocketChannel() {Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(null);}});// 3.连接到远程节点等待连接完成ChannelFuture channelFuture bootstrap.connect().sync();// 4.获取channel并且写数据发送消息到服务器端channelFuture.channel().writeAndFlush(Unpooled.copiedBuffer(hello netty.getBytes(StandardCharsets.UTF_8)));// 5.阻塞程序等待接收消息channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {throw new RuntimeException(e);} finally {try {group.shutdownGracefully().sync();} catch (InterruptedException e) {e.printStackTrace();}}return null;}});return (T) helloProxy;
}b.对通道channel进行缓存
每次启动程序都会建立一个新的Netty连接显示是对不合适的 解决方案缓存channel尝试从缓存中获取channel。如果为空则创建新的连接并进行缓存
1.在DcyRpcBootstrap类的中添加一个全局的缓存对通道进行缓存
// Netty的连接缓存
public static final MapInetSocketAddress, Channel CHANNEL_CACHE new ConcurrentHashMap();2.在ReferenceConfig类的get()方法中进行修改查询缓存是否存在通道(address)若未命中则建立新的channel并进行缓存
/*** 代理设计模式生成一个API接口的代理对象* return 代理对象*/
public T get() {// 使用动态代理完成工作ClassLoader classLoader Thread.currentThread().getContextClassLoader();Class[] classes new Class[]{interfaceRef};// 使用动态代理生成代理对象Object helloProxy Proxy.newProxyInstance(classLoader, classes, new InvocationHandler() {Overridepublic Object invoke(Object proxy, Method method, Object[] args) throws Throwable {// 调用sayHi()方法事实上会走进这个代码段当中// 已经知道method(具体的方法)args(参数列表)log.info(method--{}, method.getName());log.info(args--{}, args);// 1.发现服务从注册中心寻找一个可用的服务// 传入服务的名字返回ip端口 InetSocketAddress可以封装端口/ip/host nameInetSocketAddress address registry.lookup(interfaceRef.getName());if (log.isInfoEnabled()){log.info(服务调用方发现了服务{}的可用主机{}, interfaceRef.getName(), address);}// 2.使用netty连接服务器发送 调用的 服务名字方法名字参数列表得到结果// 每次在这都会建立一个新的连接对程序不合适// 解决方案缓存channel尝试从缓存中获取channel。如果为空则创建新的连接并进行缓存// 1.从全局缓存中获取一个通道Channel channel DcyRpcBootstrap.CHANNEL_CACHE.get(address);if (channel null) {// 建立新的channel// 定义线程池 EventLoopGroupNioEventLoopGroup group new NioEventLoopGroup();// 启动一个客户端需要一个辅助类 bootstrapBootstrap bootstrap new Bootstrap();try {bootstrap bootstrap.group(group).remoteAddress(address)// 选择初始化一个什么样的channel.channel(NioSocketChannel.class).handler(new ChannelInitializerSocketChannel() {Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(null);}});// 3.尝试连接服务器channel bootstrap.connect().sync().channel();// 缓存DcyRpcBootstrap.CHANNEL_CACHE.put(address, channel);} catch (InterruptedException e) {throw new RuntimeException(e);}}if (channel null){throw new NetworkException(获取通道channel发生了异常。);}ChannelFuture channelFuture channel.writeAndFlush(new Object());return null;}});return (T) helloProxy;
}c.对代码进行重构优化
1.在com.dcyrpc.discovery下创建NettyBootstrapInitializer类提供Bootstrap的单例
/*** 提供Bootstrap的单例*/
public class NettyBootstrapInitializer {private static final Bootstrap bootstrap new Bootstrap();static {NioEventLoopGroup group new NioEventLoopGroup();bootstrap.group(group)// 选择初始化一个什么样的channel.channel(NioSocketChannel.class).handler(new ChannelInitializerSocketChannel() {Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(null);}});}private NettyBootstrapInitializer() {}public static Bootstrap getBootstrap() {return bootstrap;}
}2.在ReferenceConfig类的get()方法中进行代码的优化
/*** 代理设计模式生成一个API接口的代理对象* return 代理对象*/
public T get() {// 使用动态代理完成工作ClassLoader classLoader Thread.currentThread().getContextClassLoader();Class[] classes new Class[]{interfaceRef};// 使用动态代理生成代理对象Object helloProxy Proxy.newProxyInstance(classLoader, classes, new InvocationHandler() {Overridepublic Object invoke(Object proxy, Method method, Object[] args) throws Throwable {// 调用sayHi()方法事实上会走进这个代码段当中// 已经知道method(具体的方法)args(参数列表)log.info(method--{}, method.getName());log.info(args--{}, args);// 1.发现服务从注册中心寻找一个可用的服务// 传入服务的名字返回ip端口 InetSocketAddress可以封装端口/ip/host nameInetSocketAddress address registry.lookup(interfaceRef.getName());if (log.isInfoEnabled()){log.info(服务调用方发现了服务{}的可用主机{}, interfaceRef.getName(), address);}// 2.使用netty连接服务器发送 调用的 服务名字方法名字参数列表得到结果// 每次在这都会建立一个新的连接对程序不合适// 解决方案缓存channel尝试从缓存中获取channel。如果为空则创建新的连接并进行缓存// 1.从全局缓存中获取一个通道Channel channel DcyRpcBootstrap.CHANNEL_CACHE.get(address);if (channel null) {// await()方法会阻塞会等待连接成功再返回// sync和await都是阻塞当前线程获取返回值。因为连接过程和发送数据过程是异步的// 如果发生了异常sync会主动在主线程抛出异常await不会异常在子线程中处理需要使用future处理
// channel NettyBootstrapInitializer.getBootstrap().connect(address).await().channel();// 使用addListener执行异步操作CompletableFutureChannel channelFuture new CompletableFuture();NettyBootstrapInitializer.getBootstrap().connect(address).addListener((ChannelFutureListener) promise - {if (promise.isDone()) {// 异步的已经完成log.info(已经和【{}】成功建立连接。, address);channelFuture.complete(promise.channel());} else if (!promise.isSuccess()) {channelFuture.completeExceptionally(promise.cause());}});// 阻塞获取channelchannel channelFuture.get(3, TimeUnit.SECONDS);// 缓存channelDcyRpcBootstrap.CHANNEL_CACHE.put(address, channel);}if (channel null){throw new NetworkException(获取通道channel发生了异常。);}/*** ---------------------------同步策略---------------------------*/
// ChannelFuture channelFuture channel.writeAndFlush(new Object()).await();
// // get()阻塞获取结果
// // getNow()获取当前的结果如果未处理完成返回null
// if (channelFuture.isDone()) {
// Object object channelFuture.getNow();
// } else if (!channelFuture.isSuccess()) {
// // 发生问题需要捕获异常。
// // 子线程可以捕获异步任务的异常
// Throwable cause channelFuture.cause();
// throw new RuntimeException(cause);
// }/*** ---------------------------异步策略---------------------------*/CompletableFutureObject completableFuture new CompletableFuture();// TODO 需要将completableFuture暴露出去channel.writeAndFlush(Unpooled.copiedBuffer(hello.getBytes())).addListener((ChannelFutureListener) promise - {// 当前的promise返回的结果是writeAndFlush的返回结果// 一旦数据被写出去这个promise也就结束了
// if (promise.isDone()) {
// completableFuture.complete(promise.getNow());
// }// 只需要处理异常if (!promise.isSuccess()) {completableFuture.completeExceptionally(promise.cause());}});return completableFuture.get(3, TimeUnit.SECONDS);}});return (T) helloProxy;
}d.完成基础通信
1.在DcyRpcBootstrap类的start()方法中添加 handlerSimpleChannelInboundHandler
/*** 启动netty服务*/
public void start() {// 1.创建EventLoopGroup老板只负责处理请求之后会将请求分发给worker1比2的比例NioEventLoopGroup boss new NioEventLoopGroup(2);NioEventLoopGroup worker new NioEventLoopGroup(10);try{// 2.服务器端启动辅助对象ServerBootstrap serverBootstrap new ServerBootstrap();// 3.配置服务器serverBootstrap serverBootstrap.group(boss, worker).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializerSocketChannel() {Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {// TODO 核心内容需要添加很多入栈和出栈的handlersocketChannel.pipeline().addLast(new SimpleChannelInboundHandlerObject() {Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, Object msg) throws Exception {ByteBuf byteBuf (ByteBuf) msg;log.info(byteBuf -- {}, byteBuf.toString(Charset.defaultCharset()));channelHandlerContext.channel().writeAndFlush(Unpooled.copiedBuffer(dcyrpc--hello.getBytes()));}});}});// 4.绑定端口ChannelFuture channelFuture serverBootstrap.bind(port).sync();// 5.阻塞操作channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();} finally {try {boss.shutdownGracefully().sync();worker.shutdownGracefully().sync();} catch (InterruptedException e) {e.printStackTrace();}}
}2.在NettyBootstrapInitializer类的初始化Netty的静态代码块中添加 handlerSimpleChannelInboundHandler
static {NioEventLoopGroup group new NioEventLoopGroup();bootstrap.group(group)// 选择初始化一个什么样的channel.channel(NioSocketChannel.class).handler(new ChannelInitializerSocketChannel() {Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new SimpleChannelInboundHandlerByteBuf() {Overrideprotected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {log.info(msg -- {}, msg.toString(Charset.defaultCharset()));}});}});
}e.异步获取服务器的返回结果
1.在DcyRpcBootstrap类的中添加一个全局的对外挂起的 completableFuture
// 定义全局的对外挂起的 completableFuture
public static final MapLong, CompletableFutureObject PENDING_REQUEST new HashMap(128);2.在ReferenceConfig类中的get()方法完成对completableFuture暴露出去
/*** 代理设计模式生成一个API接口的代理对象* return 代理对象*/
public T get() {// 使用动态代理完成工作ClassLoader classLoader Thread.currentThread().getContextClassLoader();Class[] classes new Class[]{interfaceRef};// 使用动态代理生成代理对象Object helloProxy Proxy.newProxyInstance(classLoader, classes, new InvocationHandler() {Overridepublic Object invoke(Object proxy, Method method, Object[] args) throws Throwable {// 调用sayHi()方法事实上会走进这个代码段当中// 已经知道method(具体的方法)args(参数列表)log.info(method--{}, method.getName());log.info(args--{}, args);// 1.发现服务从注册中心寻找一个可用的服务// 传入服务的名字返回ip端口 InetSocketAddress可以封装端口/ip/host nameInetSocketAddress address registry.lookup(interfaceRef.getName());if (log.isInfoEnabled()){log.info(服务调用方发现了服务{}的可用主机{}, interfaceRef.getName(), address);}// 1.从全局缓存中获取一个通道Channel channel DcyRpcBootstrap.CHANNEL_CACHE.get(address);if (channel null) {// 使用addListener执行异步操作CompletableFutureChannel channelFuture new CompletableFuture();NettyBootstrapInitializer.getBootstrap().connect(address).addListener((ChannelFutureListener) promise - {if (promise.isDone()) {// 异步的已经完成log.info(已经和【{}】成功建立连接。, address);channelFuture.complete(promise.channel());} else if (!promise.isSuccess()) {channelFuture.completeExceptionally(promise.cause());}});// 阻塞获取channelchannel channelFuture.get(3, TimeUnit.SECONDS);// 缓存channelDcyRpcBootstrap.CHANNEL_CACHE.put(address, channel);}if (channel null){log.error(获取或建立与【{}】通道时发生了异常。, address);throw new NetworkException(获取通道时发生了异常。);}CompletableFutureObject completableFuture new CompletableFuture();// TODO 需要将completableFuture暴露出去DcyRpcBootstrap.PENDING_REQUEST.put(1L, completableFuture);channel.writeAndFlush(Unpooled.copiedBuffer(hello.getBytes())).addListener((ChannelFutureListener) promise - {// 只需要处理异常if (!promise.isSuccess()) {completableFuture.completeExceptionally(promise.cause());}});// 如果没有地方处理这个completableFuture这里会阻塞等待 complete 方法的执行// 在Netty的pipeline中最终的handler的处理结果 调用completereturn completableFuture.get(10, TimeUnit.SECONDS);}});return (T) helloProxy;
}3.在NettyBootstrapInitializer类的初始化Netty的静态代码块中寻找与之匹配的待处理 completeFuture
tatic {NioEventLoopGroup group new NioEventLoopGroup();bootstrap.group(group)// 选择初始化一个什么样的channel.channel(NioSocketChannel.class).handler(new ChannelInitializerSocketChannel() {Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new SimpleChannelInboundHandlerByteBuf() {Overrideprotected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {// 异步// 服务提供方给予的结果String result msg.toString(Charset.defaultCharset());// 从全局的挂起的请求中寻找与之匹配的待处理 completeFutureCompletableFutureObject completableFuture DcyRpcBootstrap.PENDING_REQUEST.get(1L);completableFuture.complete(result);}});}});
}f.调整代码
在core模块com.dcyrpc下创建proxy.handler包
在handler包下创建RpcConsumerInvocationHandler类实现InvocationHandler接口
把ReferenceConfig类下的InvocationHandler匿名内部类拷贝到该RpcConsumerInvocationHandler类中
/*** 该类封装了客户端通信的基础逻辑每一个代理对象的远程调用过程都封装在invoke方法中* 1.发现可用服务* 2.建立连接* 3.发送请求* 4.得到结果*/
Slf4j
public class RpcConsumerInvocationHandler implements InvocationHandler {// 接口private Class? interfaceRef;// 注册中心private Registry registry;public RpcConsumerInvocationHandler(Class? interfaceRef, Registry registry) {this.interfaceRef interfaceRef;this.registry registry;}Overridepublic Object invoke(Object proxy, Method method, Object[] args) throws Throwable {// 1.发现服务从注册中心寻找一个可用的服务// - 传入服务的名字返回ip端口 InetSocketAddress可以封装端口/ip/host nameInetSocketAddress address registry.lookup(interfaceRef.getName());if (log.isInfoEnabled()){log.info(服务调用方发现了服务{}的可用主机{}, interfaceRef.getName(), address);}// 2.尝试获取一个可用的通道Channel channel getAvailableChannel(address);if (log.isInfoEnabled()){log.info(获取了和【{}】建立的连接通道准备发送数据, address);}/*** ---------------------------封装报文---------------------------*/// 3.封装报文/*** ---------------------------同步策略---------------------------*/
// ChannelFuture channelFuture channel.writeAndFlush(new Object()).await();
// // get()阻塞获取结果
// // getNow()获取当前的结果如果未处理完成返回null
// if (channelFuture.isDone()) {
// Object object channelFuture.getNow();
// } else if (!channelFuture.isSuccess()) {
// // 发生问题需要捕获异常。
// // 子线程可以捕获异步任务的异常
// Throwable cause channelFuture.cause();
// throw new RuntimeException(cause);
// }/*** ---------------------------异步策略---------------------------*/// 4.写出报文CompletableFutureObject completableFuture new CompletableFuture();// 将completableFuture暴露出去DcyRpcBootstrap.PENDING_REQUEST.put(1L, completableFuture);channel.writeAndFlush(Unpooled.copiedBuffer(hello.getBytes())).addListener((ChannelFutureListener) promise - {// 需要处理异常if (!promise.isSuccess()) {completableFuture.completeExceptionally(promise.cause());}});// 如果没有地方处理这个completableFuture这里会阻塞等待 complete 方法的执行// 在Netty的pipeline中最终的handler的处理结果 调用complete// 5.获得响应的结果return completableFuture.get(10, TimeUnit.SECONDS);}/*** 根据地址获取一个可用的通道* param address* return*/private Channel getAvailableChannel(InetSocketAddress address) {// 1.尝试从缓存中获取通道Channel channel DcyRpcBootstrap.CHANNEL_CACHE.get(address);// 2.拿不到就建立新连接if (channel null) {// 使用addListener执行异步操作CompletableFutureChannel channelFuture new CompletableFuture();NettyBootstrapInitializer.getBootstrap().connect(address).addListener((ChannelFutureListener) promise - {if (promise.isDone()) {// 异步的已经完成log.info(已经和【{}】成功建立连接。, address);channelFuture.complete(promise.channel());} else if (!promise.isSuccess()) {channelFuture.completeExceptionally(promise.cause());}});// 阻塞获取channeltry {channel channelFuture.get(3, TimeUnit.SECONDS);} catch (InterruptedException | ExecutionException | TimeoutException e) {log.error(获取通道时发生异常。{}, e);throw new DiscoveryException(e);}// 缓存channelDcyRpcBootstrap.CHANNEL_CACHE.put(address, channel);}// 3.建立连接失败if (channel null){log.error(获取或建立与【{}】通道时发生了异常。, address);throw new NetworkException(获取通道时发生了异常。);}// 4.返回通道return channel;}
}ReferenceConfig类的get()方法被修改为让整个代码可读性更高更简洁
/*** 代理设计模式生成一个API接口的代理对象* return 代理对象*/
public T get() {// 使用动态代理完成工作ClassLoader classLoader Thread.currentThread().getContextClassLoader();ClassT[] classes new Class[]{interfaceRef};InvocationHandler handler new RpcConsumerInvocationHandler(interfaceRef, registry);// 使用动态代理生成代理对象Object helloProxy Proxy.newProxyInstance(classLoader, classes, handler);return (T) helloProxy;
}g.处理handler (优化)
在core模块com.dcyrpc下创建channelhandler.handler包
在channelhandler.handler包下创建MySimpleChannelInboundHandler类处理响应结果
继承 SimpleChannelInboundHandlerByteBuf重写read0方法
拷贝NettyBootstrapInitializer静态代码块中的匿名内部类SimpleChannelInboundHandler的代码
public class MySimpleChannelInboundHandler extends SimpleChannelInboundHandlerByteBuf {Overrideprotected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {// 异步// 服务提供方给予的结果String result msg.toString(Charset.defaultCharset());// 从全局的挂起的请求中寻找与之匹配的待处理 completeFutureCompletableFutureObject completableFuture DcyRpcBootstrap.PENDING_REQUEST.get(1L);completableFuture.complete(result);}
}在channelhandler包下创建ConsumerChannelInitializer继承 ChannelInitializerSocketChannel重写initChannel方法
拷贝NettyBootstrapInitializer静态代码块中的匿名内部类ChannelInitializer的代码
public class ConsumerChannelInitializer extends ChannelInitializerSocketChannel {Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new MySimpleChannelInboundHandler());}
}在NettyBootstrapInitializer类的初始化Netty的静态代码块中优化handler的匿名内部类
static {NioEventLoopGroup group new NioEventLoopGroup();bootstrap.group(group)// 选择初始化一个什么样的channel.channel(NioSocketChannel.class).handler(new ConsumerChannelInitializer());
}