炼数成金 门户 大数据 分布式系统 查看内容

分布式基础—RPC

2018-10-29 10:29| 发布者: 炼数成金_小数| 查看: 14871| 评论: 0|原作者: zhangjunjun|来自: 死磕Java与Scala技术栈

摘要: 在传统的开发模式中,我们通常将系统的各个服务部署在单台机器,随着服务的扩展,这种方式已经完全无法满足系统大规模的扩展需要,分布式系统由此诞生,在分布式系统中,最重要就是各个服务之间的 RPC 调用。RPC 全 ...

管理 Hadoop 服务器 框架 分布式

在传统的开发模式中,我们通常将系统的各个服务部署在单台机器,随着服务的扩展,这种方式已经完全无法满足系统大规模的扩展需要,分布式系统由此诞生,在分布式系统中,最重要就是各个服务之间的 RPC 调用。

RPC 全称 Remote Procedure Call——远程过程调用,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的方式。简单一点就是:通过一定协议和方法使得调用远程计算机上的服务,就像调用本地服务一样。

通常来说,RPC 的实现方式有很多,可以基于常见的 HTTP 协议,也可以在TCP上层封装自定义协议,常见的 Web Service 就是基于 HTTP 协议的 RPC,HTTP 协议的优点是具有良好的跨平台性,特别适合异构系统较多的公司,但是由于 HTTP 报头较为冗长,性能较差,基于 TCP 协议的 RPC 可以建立长连接,速度和效率明显,但是难度和复杂程度很高。

RPC 的诞生让构建分布式应用更容易,极大的扩大系统的可扩展性,容错性。为复杂业务逻辑的系统进行服务化改造和高可用性升级提供了可能。

RPC 调用分类
从通信协议层面可以分为:
1、基于 HTTP 协议的 RPC;
2、基于二进制协议的 RPC;
3、基于 TCP 协议的 RPC。

从是否跨平台可分为:
1、单语言 RPC,如 RMI, Remoting;
2、跨平台 RPC,如 google protobuffer, restful json,http XML。

从调用过程来看,可以分为同步通信RPC和异步通信RPC:
1、同步 RPC:指的是客户端发起调用后,必须等待调用执行完成并返回结果;
2、异步 RPC:指客户方调用后不关心执行结果返回,如果客户端需要结果,可用通过提供异步 callback 回调获取返回信息。大部分 RPC 框架都同时支持这两种方式的调用。

RPC 框架结构

RPC 服务方的主要职责是提供服务,供客户端调用访问,服务端会通过一个接收器接受客户端的调用请求,根据相应的 RPC 协议进行解码获取调用方法以及相关参数,当调用完成后,服务器端通过后台处理模块处理完成并将结果返回给客户端。

对于客户端来说,服务调用完全透明,像调用本地服务一样调用远程方法,客户端调用服务时候通过一个远程连接和服务端建立通道,并通过相应的协议进行编码,将调用的方法和相关参数发送给服务方。

来张更细致的:

RPC 模块详解
下面我们根据上面的RPC的架构图(第一幅图),对图中的各个模块进行拆解,并解释每个模块的作用。

服务端(Server):RPC 服务的提供者,负责将 RPC 服务导出;
客户端 (Client):RPC 服务的消费者,负责调用 RPC 服务;
代理(Proxy):通过动态代理,提供对远程接口的代理实现;
执行器(Invoker):对于客户端:主要负责服务调用的编码,调用请求发送和等待结果返回;对于服务方:负责处理调用逻辑并返回调用结果;
协议管理(Protocol):协议管理组件,负责整个 RPC 通信协议的编/解码;
连接端口(Connector):负责维持客户方和服务方的长连接通道;
后台处理(Processor):负责整个调用服务中的管理调度,包括线程池,分发,异常处理等;
连接通道(Channel):客户端和服务器端的数据传输通道。
具体到 Java 平台来说,其中的3,4通常使用动态代理实现,5,6,7,8使用 NIO 或者一些高性能 NIO 框架,如 mina,netty 实现。

最简单的 RPC JAVA 实现
在进一步拆解了组件并划分了职责之后,这里以一个最简单 Java RPC 框架实现为例,对 RPC 具体逻辑进行分析。

public classRpcFramework {
   public static voidpublish(finalObject service,intport)throwsIOException {
       if(service ==null){
           throw newIllegalArgumentException("发布服务不能为空");
       }
       if(port <=  0|| port >65535){
           throw newIllegalArgumentException("端口不合法"+port);
       }
       finalServerSocket server =newServerSocket(port);
       while(true){
           try{
               finalSocket socket = server.accept();
               newThread(newRunnable() {
                   public voidrun() {
                       try{
                           try{
                               ObjectInputStream input =newObjectInputStream(socket.getInputStream());
                               try{
                                   String methodName = input.readUTF();
                                   Class<?>[] parameterTypes = (Class<?>[]) input.readObject();
                                   Object[] arguments = (Object[]) input.readObject();
                                   ObjectOutputStream output =newObjectOutputStream(socket.getOutputStream());
                                   try{
                                       Method method =service.getClass().getMethod(methodName, parameterTypes);
                                       Object result = method.invoke(service, arguments);
                                       output.writeObject(result);
                                   }catch(Throwable e) {
                                      output.writeObject(e);
                                   }finally{
                                       output.close();
                                   }
                               }finally{
                                   input.close();
                               }
                           }finally{
                               socket.close();
                           }
                       }catch(Exception e) {
                           e.printStackTrace();
                       }
                   }
               }).start();
           }catch(Exception e){
               e.printStackTrace();

           }
       }
   }
}

服务端发布服务的代码如上,首先校验传入的端口和服务是否合法,然后开启一个 socket 监听,这儿为了简便,没有采用 NIO 方式,同时直接采用 java 的序列化方式,将传入的数据通过反射取出调用的方法和参数,本地执行后将运行结果通过 socket 套接字返回给客户端。

RPC 框架服务调用代码:
public static<T>Tcall(finalClass<T> interfaceClass,String host,intport){

   if(interfaceClass ==null){

       throw newIllegalArgumentException("调用服务为空");

   }

   if(host ==null|| host.length() ==0){

       throw newIllegalArgumentException("主机不能为null");

   }

   if(port <=0  || port >65535) {

       throw newIllegalArgumentException("端口不合法"+port);

   }

   return(T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),newClass<?>[]{interfaceClass},newCallHandler(host,port));

}

static classCallHandlerimplementsInvocationHandler{

   private finalStringhost;

   private final intport;

   publicCallHandler(String host,intport){

       this.host= host;

       this.port= port;

   }

   publicObject invoke(Object proxy, Method method, Object[] args)throwsThrowable {

       Socket socket =newSocket(host,port);

       try{

           ObjectOutputStream output =newObjectOutputStream(socket.getOutputStream());

           try{

               output.writeUTF(method.getName());

               output.writeObject(method.getParameterTypes());

               output.writeObject(args);

               ObjectInputStream input =newObjectInputStream(socket.getInputStream());

               try{

                   Object result = input.readObject();

                   if(resultinstanceofThrowable) {

                       throw(Throwable) result;

                   }

                   returnresult;

               }finally{

                   input.close();

               }

           }finally{

               output.close();

           }

       }finally{

           socket.close();

       }

   }

}

框架中客户端调用的代码中,首先校验对应的端口和主机是否合法,然后通过动态代理生成一个代理对象,在代理对象的方法中,拦截调用,通过建立 socket 连接,将方法和参数传递到远端执行并获取远程执行返回结果。

RPC 调用测试:
public classRpcProvider {

   public static voidmain(String[] args)throwsIOException {

       HelloService service =newHelloServiceImpl();

       RpcFramework.publish(service,8888);

   }

}

interfaceHelloService{

   String hello(String name);

}

classHelloServiceImplimplementsHelloService{

   publicString hello(String name) {

       return"RPC调用hello"+name;

   }

}

public classRpcConsumer {

   public static voidmain(String[] args) {

       HelloService service = RpcFramework.call(HelloService.class,"127.0.0.1",8888);

       String hello = service.hello("java&Scala");

       System.out.println(hello);

   }

}

验证
RPC调用hellojava&scala

Process finished with exit code 0

如上所示,服务器端发布一个接口服务 HelloService,客户端成功通过 RPC 调用。

声明:文章收集于网络,如有侵权,请联系小编及时处理,谢谢!

欢迎加入本站公开兴趣群
软件开发技术群
兴趣范围包括:Java,C/C++,Python,PHP,Ruby,shell等各种语言开发经验交流,各种框架使用,外包项目机会,学习、培训、跳槽等交流
QQ群:26931708

Hadoop源代码研究群
兴趣范围包括:Hadoop源代码解读,改进,优化,分布式系统场景定制,与Hadoop有关的各种开源项目,总之就是玩转Hadoop
QQ群:288410967 

鲜花

握手

雷人

路过

鸡蛋

相关阅读

最新评论

热门频道

  • 大数据
  • 商业智能
  • 量化投资
  • 科学探索
  • 创业

即将开课

 

GMT+8, 2018-11-20 07:37 , Processed in 0.118891 second(s), 25 queries .