加入收藏 | 设为首页 | 会员中心 | 我要投稿 黄山站长网 (https://www.0559zz.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 编程要点 > 语言 > 正文

Java从零开启手写一 Reflect 反射实现通用调用之客户端

发布时间:2021-11-03 15:48:18 所属栏目:语言 来源:互联网
导读:上一篇我们介绍了,如何实现基于反射的通用服务端。这一节我们来一起学习下如何实现通用客户端。因为内容较多,所以拆分为 2 个部分。基本思路所有的方法调用,基于反射进行相关处理实现。java 从零开始手写 RPC (06) reflect 反射实现通用调用之客户端核心
上一篇我们介绍了,如何实现基于反射的通用服务端。   这一节我们来一起学习下如何实现通用客户端。   因为内容较多,所以拆分为 2 个部分。   基本思路 所有的方法调用,基于反射进行相关处理实现。   java 从零开始手写 RPC (06) reflect 反射实现通用调用之客户端 核心类 为了便于拓展,我们把核心类调整如下:   package com.github.houbb.rpc.client.core;      import com.github.houbb.heaven.annotation.ThreadSafe;  import com.github.houbb.log.integration.core.Log;  import com.github.houbb.log.integration.core.LogFactory;  import com.github.houbb.rpc.client.core.context.RpcClientContext;  import com.github.houbb.rpc.client.handler.RpcClientHandler;  import com.github.houbb.rpc.common.constant.RpcConstant;  import io.netty.bootstrap.Bootstrap;  import io.netty.channel.*;  import io.netty.channel.nio.NioEventLoopGroup;  import io.netty.channel.socket.nio.NioSocketChannel;  import io.netty.handler.codec.serialization.ClassResolvers;  import io.netty.handler.codec.serialization.ObjectDecoder;  import io.netty.handler.codec.serialization.ObjectEncoder;  import io.netty.handler.logging.LogLevel;  import io.netty.handler.logging.LoggingHandler;      /**   * <p> rpc 客户端 </p>   *   * <pre> Created: 2019/10/16 11:21 下午  </pre>   * <pre> Project: rpc  </pre>   *   * @author houbinbin   * @since 0.0.2   */  @ThreadSafe  public class RpcClient {          private static final Log log = LogFactory.getLog(RpcClient.class);          /**       * 地址信息       * @since 0.0.6       */      private final String address;          /**       * 监听端口号       * @since 0.0.6       */      private final int port;          /**       * 客户端处理 handler       * 作用:用于获取请求信息       * @since 0.0.4       */      private final ChannelHandler channelHandler;          public RpcClient(final RpcClientContext clientContext) {          this.address = clientContext.address();          this.port = clientContext.port();          this.channelHandler = clientContext.channelHandler();      }          /**       * 进行连接       * @since 0.0.6       */      public ChannelFuture connect() {          // 启动服务端          log.info("RPC 服务开始启动客户端");              EventLoopGroup workerGroup = new NioEventLoopGroup();              /**           * channel future 信息           * 作用:用于写入请求信息           * @since 0.0.6           */          ChannelFuture channelFuture;          try {              Bootstrap bootstrap = new Bootstrap();              channelFuture = bootstrap.group(workerGroup)                      .channel(NioSocketChannel.class)                      .option(ChannelOption.SO_KEEPALIVE, true)                      .handler(new ChannelInitializer<Channel>(){                          @Override                          protected void initChannel(Channel ch) throws Exception {                              ch.pipeline()                                      // 解码 bytes=>resp                                      .addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)))                                      // request=>bytes                                      .addLast(new ObjectEncoder())                                      // 日志输出                                      .addLast(new LoggingHandler(LogLevel.INFO))                                      .addLast(channelHandler);                          }                      })                      .connect(address, port)                      .syncUninterruptibly();              log.info("RPC 服务启动客户端完成,监听地址 {}:{}", address, port);          } catch (Exception e) {              log.error("RPC 客户端遇到异常", e);              throw new RuntimeException(e);          }          // 不要关闭线程池!!!              return channelFuture;      }      }  可以灵活指定对应的服务端地址、端口信息。   ChannelHandler 作为处理参数传入。   ObjectDecoder、ObjectEncoder、LoggingHandler 都和服务端类似,是 netty 的内置实现。   RpcClientHandler 客户端的 handler 实现如下:   /*   * Copyright (c)  2019. houbinbin Inc.   * rpc All rights reserved.   */      package com.github.houbb.rpc.client.handler;      import com.github.houbb.log.integration.core.Log;  import com.github.houbb.log.integration.core.LogFactory;  import com.github.houbb.rpc.client.core.RpcClient;  import com.github.houbb.rpc.client.invoke.InvokeService;  import com.github.houbb.rpc.common.rpc.domain.RpcResponse;  import io.netty.channel.ChannelHandlerContext;  import io.netty.channel.SimpleChannelInboundHandler;      /**   * <p> 客户端处理类 </p>   *   * <pre> Created: 2019/10/16 11:30 下午  </pre>   * <pre> Project: rpc  </pre>   *   * @author houbinbin   * @since 0.0.2   */  public class RpcClientHandler extends SimpleChannelInboundHandler {          private static final Log log = LogFactory.getLog(RpcClient.class);          /**       * 调用服务管理类       *       * @since 0.0.6       */      private final InvokeService invokeService;          public RpcClientHandler(InvokeService invokeService) {          this.invokeService = invokeService;      }          @Override      protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {          RpcResponse rpcResponse = (RpcResponse)msg;          invokeService.addResponse(rpcResponse.seqId(), rpcResponse);          log.info("[Client] response is :{}", rpcResponse);      }          @Override      public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {          // 每次用完要关闭,不然拿不到response,我也不知道为啥(目测得了解netty才行)          // 个人理解:如果不关闭,则永远会被阻塞。          ctx.flush();          ctx.close();      }          @Override      public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {          cause.printStackTrace();          ctx.close();      }  }  只有 channelRead0 做了调整,基于 InvokeService 对结果进行处理。   InvokeService 接口 package com.github.houbb.rpc.client.invoke;      import com.github.houbb.rpc.common.rpc.domain.RpcResponse;      /**   * 调用服务接口   * @author binbin.hou   * @since 0.0.6   */  public interface InvokeService {          /**       * 添加请求信息       * @param seqId 序列号       * @return this       * @since 0.0.6       */      InvokeService addRequest(final String seqId);          /**       * 放入结果       * @param seqId 唯一标识       * @param rpcResponse 响应结果       * @return this       * @since 0.0.6       */      InvokeService addResponse(final String seqId, final RpcResponse rpcResponse);          /**       * 获取标志信息对应的结果       * @param seqId 序列号       * @return 结果       * @since 0.0.6       */      RpcResponse getResponse(final String seqId);      }  主要是对入参、出参的设置,以及出参的获取。   实现 package com.github.houbb.rpc.client.invoke.impl;      import com.github.houbb.heaven.util.guava.Guavas;  import com.github.houbb.heaven.util.lang.ObjectUtil;  import com.github.houbb.log.integration.core.Log;  import com.github.houbb.log.integration.core.LogFactory;  import com.github.houbb.rpc.client.core.RpcClient;  import com.github.houbb.rpc.client.invoke.InvokeService;  import com.github.houbb.rpc.common.exception.RpcRuntimeException;  import com.github.houbb.rpc.common.rpc.domain.RpcResponse;      import java.util.Set;  import java.util.concurrent.ConcurrentHashMap;      /**   * 调用服务接口   * @author binbin.hou   * @since 0.0.6   */  public class DefaultInvokeService implements InvokeService {          private static final Log LOG = LogFactory.getLog(DefaultInvokeService.class);          /**       * 请求序列号集合       * (1)这里后期如果要添加超时检测,可以添加对应的超时时间。       * 可以把这里调整为 map       * @since 0.0.6       */      private final Set<String> requestSet;          /**       * 响应结果       * @since 0.0.6       */      private final ConcurrentHashMap<String, RpcResponse> responseMap;          public DefaultInvokeService() {          requestSet = Guavas.newHashSet();          responseMap = new ConcurrentHashMap<>();      }          @Override      public InvokeService addRequest(String seqId) {          LOG.info("[Client] start add request for seqId: {}", seqId);          requestSet.add(seqId);          return this;      }          @Override      public InvokeService addResponse(String seqId, RpcResponse rpcResponse) {          // 这里放入之前,可以添加判断。          // 如果 seqId 必须处理请求集合中,才允许放入。或者直接忽略丢弃。          LOG.info("[Client] 获取结果信息,seq: {}, rpcResponse: {}", seqId, rpcResponse);          responseMap.putIfAbsent(seqId, rpcResponse);              // 通知所有等待方          LOG.info("[Client] seq 信息已经放入,通知所有等待方", seqId);              synchronized (this) {              this.notifyAll();          }              return this;      }          @Override      public RpcResponse getResponse(String seqId) {          try {              RpcResponse rpcResponse = this.responseMap.get(seqId);              if(ObjectUtil.isNotNull(rpcResponse)) {                  LOG.info("[Client] seq {} 对应结果已经获取: {}", seqId, rpcResponse);                  return rpcResponse;              }                  // 进入等待              while (rpcResponse == null) {                  LOG.info("[Client] seq {} 对应结果为空,进入等待", seqId);                  // 同步等待锁                  synchronized (this) {                      this.wait();                  }                      rpcResponse = this.responseMap.get(seqId);                  LOG.info("[Client] seq {} 对应结果已经获取: {}", seqId, rpcResponse);              }                  return rpcResponse;          } catch (InterruptedException e) {              throw new RpcRuntimeException(e);          }      }  }  使用 requestSet 存储对应的请求入参。   使用 responseMap 存储对应的请求出参,在获取的时候通过同步 while 循环等待,获取结果。   此处,通过 notifyAll() 和 wait() 进行等待和唤醒。   ReferenceConfig-服务端配置 说明 我们想调用服务端,首先肯定要定义好要调用的对象。   ReferenceConfig 就是要告诉 rpc 框架,调用的服务端信息。   接口 package com.github.houbb.rpc.client.config.reference;      import com.github.houbb.rpc.common.config.component.RpcAddress;      import java.util.List;      /**   * 引用配置类   *   * 后期配置:   * (1)timeout 调用超时时间   * (2)version 服务版本处理   * (3)callType 调用方式 oneWay/sync/async   * (4)check 是否必须要求服务启动。   *   * spi:   * (1)codec 序列化方式   * (2)netty 网络通讯架构   * (3)load-balance 负载均衡   * (4)失败策略 fail-over/fail-fast   *   * filter:   * (1)路由   * (2)耗时统计 monitor 服务治理   *   * 优化思考:   * (1)对于唯一的 serviceId,其实其 interface 是固定的,是否可以省去?   * @author binbin.hou   * @since 0.0.6   * @param <T> 接口泛型   */  public interface ReferenceConfig<T> {          /**       * 设置服务标识       * @param serviceId 服务标识       * @return this       * @since 0.0.6       */      ReferenceConfig<T> serviceId(final String serviceId);          /**       * 服务唯一标识       * @since 0.0.6       */      String serviceId();          /**       * 服务接口       * @since 0.0.6       * @return 接口信息       */      Class<T> serviceInterface();          /**       * 设置服务接口信息       * @param serviceInterface 服务接口信息       * @return this       * @since 0.0.6       */      ReferenceConfig<T> serviceInterface(final Class<T> serviceInterface);          /**       * 设置服务地址信息       * (1)单个写法:ip:port:weight       * (2)集群写法:ip1:port1:weight1,ip2:port2:weight2       *       * 其中 weight 权重可以不写,默认为1.       *       * @param addresses 地址列表信息       * @return this       * @since 0.0.6       */      ReferenceConfig<T> addresses(final String addresses);          /**       * 获取对应的引用实现       * @return 引用代理类       * @since 0.0.6       */      T reference();      }  实现 package com.github.houbb.rpc.client.config.reference.impl;      import com.github.houbb.heaven.constant.PunctuationConst;  import com.github.houbb.heaven.util.common.ArgUtil;  import com.github.houbb.heaven.util.guava.Guavas;  import com.github.houbb.heaven.util.lang.NumUtil;  import com.github.houbb.rpc.client.config.reference.ReferenceConfig;  import com.github.houbb.rpc.client.core.RpcClient;  import com.github.houbb.rpc.client.core.context.impl.DefaultRpcClientContext;  import com.github.houbb.rpc.client.handler.RpcClientHandler;  import com.github.houbb.rpc.client.invoke.InvokeService;  import com.github.houbb.rpc.client.invoke.impl.DefaultInvokeService;  import com.github.houbb.rpc.client.proxy.ReferenceProxy;  import com.github.houbb.rpc.client.proxy.context.ProxyContext;  import com.github.houbb.rpc.client.proxy.context.impl.DefaultProxyContext;  import com.github.houbb.rpc.common.config.component.RpcAddress;  import io.netty.channel.ChannelFuture;  import io.netty.channel.ChannelHandler;      import java.util.List;      /**   * 引用配置类默认实现   *   * @author binbin.hou   * @since 0.0.6   * @param <T> 接口泛型   */  public class DefaultReferenceConfig<T> implements ReferenceConfig<T> {          /**       * 服务唯一标识       * @since 0.0.6       */      private String serviceId;          /**       * 服务接口       * @since 0.0.6       */      private Class<T> serviceInterface;          /**       * 服务地址信息       * (1)如果不为空,则直接根据地址获取       * (2)如果为空,则采用自动发现的方式       *       * TODO: 这里调整为 set 更加合理。       *       * 如果为 subscribe 可以自动发现,然后填充这个字段信息。       * @since 0.0.6       */      private List<RpcAddress> rpcAddresses;          /**       * 用于写入信息       * (1)client 连接 server 端的 channel future       * (2)后期进行 Load-balance 路由等操作。可以放在这里执行。       * @since 0.0.6       */      private List<ChannelFuture> channelFutures;          /**       * 客户端处理信息       * @since 0.0.6       */      @Deprecated      private RpcClientHandler channelHandler;          /**       * 调用服务管理类       * @since 0.0.6       */      private InvokeService invokeService;          public DefaultReferenceConfig() {          // 初始化信息          this.rpcAddresses = Guavas.newArrayList();          this.channelFutures = Guavas.newArrayList();          this.invokeService = new DefaultInvokeService();      }          @Override      public String serviceId() {          return serviceId;      }          @Override      public DefaultReferenceConfig<T> serviceId(String serviceId) {          this.serviceId = serviceId;          return this;      }          @Override      public Class<T> serviceInterface() {          return serviceInterface;      }          @Override      public DefaultReferenceConfig<T> serviceInterface(Class<T> serviceInterface) {          this.serviceInterface = serviceInterface;          return this;      }          @Override      public ReferenceConfig<T> addresses(String addresses) {          ArgUtil.notEmpty(addresses, "addresses");              String[] addressArray = addresses.split(PunctuationConst.COMMA);          ArgUtil.notEmpty(addressArray, "addresses");              for(String address : addressArray) {              String[] addressSplits = address.split(PunctuationConst.COLON);              if(addressSplits.length < 2) {                  throw new IllegalArgumentException("Address must be has ip and port, like 127.0.0.1:9527");              }              String ip = addressSplits[0];              int port = NumUtil.toIntegerThrows(addressSplits[1]);              // 包含权重信息              int weight = 1;              if(addressSplits.length >= 3) {                  weight = NumUtil.toInteger(addressSplits[2], 1);              }                  RpcAddress rpcAddress = new RpcAddress(ip, port, weight);              this.rpcAddresses.add(rpcAddress);          }              return this;      }          /**       * 获取对应的引用实现       * (1)处理所有的反射代理信息-方法可以抽离,启动各自独立即可。       * (2)启动对应的长连接       * @return 引用代理类       * @since 0.0.6       */      @Override      public T reference() {          // 1. 启动 client 端到 server 端的连接信息          // 1.1 为了提升性能,可以将所有的 client=>server 的连接都调整为一个 thread。          // 1.2 初期为了简单,直接使用同步循环的方式。          // 创建 handler          // 循环连接          for(RpcAddress rpcAddress : rpcAddresses) {              final ChannelHandler channelHandler = new RpcClientHandler(invokeService);              final DefaultRpcClientContext context = new DefaultRpcClientContext();              context.address(rpcAddress.address()).port(rpcAddress.port()).channelHandler(channelHandler);              ChannelFuture channelFuture = new RpcClient(context).connect();              // 循环同步等待              // 如果出现异常,直接中断?捕获异常继续进行??              channelFutures.add(channelFuture);          }              // 2. 接口动态代理          ProxyContext<T> proxyContext = buildReferenceProxyContext();          return ReferenceProxy.newProxyInstance(proxyContext);      }          /**       * 构建调用上下文       * @return 引用代理上下文       * @since 0.0.6       */      private ProxyContext<T> buildReferenceProxyContext() {          DefaultProxyContext<T> proxyContext = new DefaultProxyContext<>();          proxyContext.serviceId(this.serviceId);          proxyContext.serviceInterface(this.serviceInterface);          proxyContext.channelFutures(this.channelFutures);          proxyContext.invokeService(this.invokeService);          return proxyContext;      }      }  这里主要根据指定的服务端信息,初始化对应的代理实现。   这里还可以拓展指定权重,便于后期负载均衡拓展,本期暂时不做实现。   ReferenceProxy 说明 所有的 rpc 调用,客户端只有服务端的接口。   那么,怎么才能和调用本地方法一样调用远程方法呢?   答案就是动态代理。

(编辑:黄山站长网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!