IPC:inter process communication 即进程间通信 RPC: remote procedure call 即远程过程调用 首先,我们通过一张图来了解Hadoop中的IPC与RPC:

Hadoop  RPC - 图1
IPC是进程间通信的过程,RPC作为远程过程的调用,必定会涉及到IPC通信
远程过程调用,指调用服务器上的方法,所以业务真正的是执行在服务器上的;客户端通过网络间通信,与远程服务器打交道,本身是没有进行实现的(所以在服务器端有相关的程序进行实现)
下面通过Hadoop提供的相关API,用Java程序来实现服务器端和客户端,模拟这种通信机制。

Hadoop  RPC - 图2

什么是RPC?

  1. RPC(Remote Procedure Call)远程过程调用,它允许一台计算机程序远程调用另一台计算机的子程序,而不用去关心底层的网络通信细节,对我们来说是透明的。经常用于分布式网络通信中。

  2. Hadoop的进程间交互都是通过RPC来进行的,比如Namenode与Datanode之间,Jobtracker与tasktracker之间等。

Hadoop  RPC - 图3

RPC相关知识

  • RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据。在OSI网络通信模型中,RPC跨越了传输层和应用层。RPC使得开发包括网络分布式多程序在内的应用程序更加容易。

  • RPC采用客户机/服务器模式。请求程序就是一个客户机,而服务器提供程序就是一个服务器。首先,客户机调用进程发送一个有进程参数的调用信息到服务进程,然后等待应答信息。

  • 在服务器端,进程保持睡眠状态直到调用信息的到达为止,当一个调用信息到达,服务器获得进程参数,计算结果,发送答复信息给client。然后等待下一个调用信息,最后,客户端调用进程接收答复信息,获得进程结果,然后调用执行继续进行。

RPC的特点

  1. 透明性:远程调用其他机器上的程序,对用户来说就像是调用本地方法一样。

  2. 高性能:RPC server能够并发处理多个来自Client的请求(请求队列)

  3. 可控性:jdk中已经提供了一个RPC框架?RMI,但是该RPC框架过于重量级并且可控之处较少,所以Hadoop RPC实现了自定义的RPC框架。

Hadoop RPC通信

  1. 序列化层:Client与Server端通信传递的信息采用了Hadoop里提供的序列化类或自定义Writable类型。

  2. 函数调用层:Hadoop RPC通过动态代理以及Java反射机制实现函数调用。

  3. 网络传输层:Hadoop RPC采用了基于TCP/IP的socket机制。

  4. 服务器端框架层:RPC Server利用Java NIO以及采用了事件驱动的I/O模型,提高RPC Server的并发处理能力。

Hadoop的整个体系结构就是构建在RPC之上(org.apache.hadoop.ipc)

Hadoop RPC 设计技术

  • 动态代理

  • 反射

  • 序列化

  • 非阻塞的异步I/O(NIO)

动态代理

  1. 动态代理可以提供另一个对象的访问,同时隐藏实际对象的具体事实,代理对象对客户隐藏了实际对象。

  2. 动态代理可以对请求进行其他的一些处理,在不允许直接访问某些类,或需要对访问做一些特殊处理等,这时候可以考虑使用代理。

  3. 目前java开发包中提供了对动态代理的支持

相关的类与接口:

  1. java.lang.reflect.Proxy --类
  2. java.lang.reflect.InvocationHandler --接口

动态代理类创建对象过程:

  1. InvocationHandler handler = new InvocationHandlerImpl(...)
  2. Proxy.newInstance(...)

RPC几个重要的协议

ClientProtocol 是客户端(Filesystem )与NameNode 通信的接口
DatanodeProtocol 是DataNode与NameNode通信的接口
NamenodeProtocol 是SecondaryNameNode与NameNode通信的接口。
DF SClient是直接调用NameNode接口的对象。用户代码是通过DistributedFileSystem调用DFSClient对象,才能与NameNode打交道。

模拟Hadoop 客户端通过RPC调用namenode namenode要返回信息

程序实现

创建java程序

客户端和服务器端 创建接口HelloWorldService.java(协议)

public interface HelloWorldService extends VersionedProtocol{
    static final long versionID = 1;
    public String sayHello(String msg);
}

实现接口(服务端实现)

import java.io.IOException;
import org.apache.hadoop.ipc.ProtocolSignature;

public class HelloWorldServiceImpl implements HelloWorldService {

    public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
        return 1;
    }

    public ProtocolSignature getProtocolSignature(String protocol, long clientVersion, int clientMethodsHash)
            throws IOException {
        try {
            return ProtocolSignature.getProtocolSignature(protocol, clientVersion);
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        }
        return null;
    }

    public String sayHello(String msg) {
        System.out.println(msg);
        return "hello" + msg;
    }
}

创建MyServer服务器端(开启服务)

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RPC.Server;

/**
 * 服务器端
 */
public class MyServer {
    public static void main(String[] args) {
        try {
            Configuration conf = new Configuration();
            Server server = new RPC.Builder(conf)
                    //设置协议
                    .setProtocol(HelloWorldService.class)
                    //设置实现类
                    .setInstance(new HelloWorldServiceImpl())
                    //绑定地址
                    .setBindAddress("localhost")
                    //设置处理器个数
                    .setNumHandlers(2)
                    //设置端口号
                    .setPort(8888).build();
            //build完之后,启动服务器
            server.start();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

创建MyClient 客户端

import java.net.InetSocketAddress;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;

/**
 * 客户端
 */
public class MyClient {
    public static void main(String[] args) {
        try {
            Configuration conf = new Configuration();
            HelloWorldService proxy = RPC.getProxy(HelloWorldService.class, HelloWorldService.versionID, new InetSocketAddress("localhost",8888), conf);
            String result = proxy.sayHello("world");
            System.out.println(result);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

运行

1.启动服务器端
2.启动客户端

运行结果:

启动服务端

Hadoop  RPC - 图4

启动客户端

Hadoop  RPC - 图5

启动客户端之后,服务端的反馈

Hadoop  RPC - 图6
———————————————————————————————————————————————————————————-

Client客户端

package com.zhiyou.hadoop.rpc;
import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import com.zhiyou.hadoop.protocol.ClientNameNodeProtocol;
/**
 * 客户端访问namenode要使用RPC框架
 * hadoop里有个RPC类封装了底层的通信过程
 * (netty)
 * protocol:限制了客户端调用namenode的协议
 * */
public class HadoopClient {
    /**
     * 返回实现这个接口的一个代理对象namenode
     * 调用本地的方法就像调用远程一样
     * 客户端发送一个请求(方法名getInfo()、参数"/luban")
     * 然后客户端的代理对象拦截了这个请求,调用socket流发给服务端
     * 然后服务端接到了这个请求,把方法名和参数拿出来
     * 这样就知道调用哪个方法,然后在namenode调用这个实例对象返回
     * 处理结果,最后通过网络socket发给代理对象
     * */
    public static void main(String[] args) throws IOException {
         ClientNameNodeProtocol namenode = RPC.getProxy(ClientNameNodeProtocol.class, 1L, 
                new InetSocketAddress("localhost", 8888), new Configuration());
        String info=namenode.getInfo("/luban");
        System.out.println(info);
    }
}

协议

package com.zhiyou.hadoop.protocol;
/**
 * 客户端和namenode调用的通信接口
 * */
public interface ClientNameNodeProtocol {
    public static final long versionID=1L;
    public String getInfo(String path);
}

server

package com.zhiyou.hadoop.server;
import com.zhiyou.hadoop.protocol.ClientNameNodeProtocol;
/**
 * 提供业务服务
 * 模拟hadoop客户端调用RPC调用namenode
 *    namenode 要返回信息
 *这个类要实现一个接口,方便以后两端通信
 * */
public class HadoopNameNode implements ClientNameNodeProtocol{
    //模拟namenode的业务之一:查询元数据
    //参数是client
    public String getInfo(String path) {
        return path+":1 -{BLK_1,BLK_2}";
    }
}

启动服务

package com.zhiyou.hadoop.server;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RPC.Builder;
import org.apache.hadoop.ipc.RPC.Server;
import com.zhiyou.hadoop.protocol.ClientNameNodeProtocol;
import com.zhiyou.hadoop.protocol.UserLoginService;
/**
 * 此 类 用来发布写好的服务
 * */
public class StartService {
    public static void main(String[] args) throws Exception, IOException {
    Builder builder = new RPC.Builder(new Configuration());
        builder.setBindAddress("localhost").
        setPort(8888).
        setProtocol(ClientNameNodeProtocol.class).
        setInstance(new HadoopNameNode());
        Server server = builder.build();
        server.start();
    }
}

使用hadoop的RPC模拟登录

通信接口,协议

package com.zhiyou.hadoop.protocol;

public interface UserLoginService {
    public static final long versionID=1L;  //读取版本号
    public String login(String name,String password);
}

接口实现类

package com.zhiyou.hadoop.server;
import com.zhiyou.hadoop.protocol.UserLoginService;
public class UserLoginServiceImpl implements UserLoginService {
    @Override
    public String login(String name, String password) {
        // action中会收到这个结果
        return name + "login.....";
    }
}

action

package com.zhiyou.hadoop.rpc;
import java.net.InetSocketAddress;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import com.zhiyou.hadoop.protocol.UserLoginService;
/**
 * 使用hadoop的rpc模拟登录
 * 此时action调用是框架的,service都在其他机器上
 * */
public class UserLoginAction {
    public static void main(String[] args) throws Exception {
        UserLoginService service = RPC.getProxy(UserLoginService.class, 1L, 
                new InetSocketAddress("localhost", 9999), 
                new Configuration());
        String login = service.login("zhangsan", "123456");
        System.out.println(login);
    }
}

启动服务

package com.zhiyou.hadoop.server;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RPC.Builder;
import org.apache.hadoop.ipc.RPC.Server;
import com.zhiyou.hadoop.protocol.ClientNameNodeProtocol;
import com.zhiyou.hadoop.protocol.UserLoginService;

/**
 * 此 类 用来发布写好的服务
 * */
public class StartService {
    public static void main(String[] args) throws Exception, IOException {
        Builder builder2 = new RPC.Builder(new Configuration());
        builder2.setBindAddress("localhost")
        .setPort(9999)
        .setProtocol(UserLoginService.class)
        .setInstance(new UserLoginServiceImpl());
        Server server1 = builder2.build();
        server1.start();
    }
}

运行结果

启动服务端:

Hadoop  RPC - 图7

启动客户端:

Hadoop  RPC - 图8

客户端启动后服务端的反馈

Hadoop  RPC - 图9