IPC:inter process communication 即进程间通信 RPC: remote procedure call 即远程过程调用 首先,我们通过一张图来了解Hadoop中的IPC与RPC:

IPC是进程间通信的过程,RPC作为远程过程的调用,必定会涉及到IPC通信
远程过程调用,指调用服务器上的方法,所以业务真正的是执行在服务器上的;客户端通过网络间通信,与远程服务器打交道,本身是没有进行实现的(所以在服务器端有相关的程序进行实现)
下面通过Hadoop提供的相关API,用Java程序来实现服务器端和客户端,模拟这种通信机制。
什么是RPC?
RPC(Remote Procedure Call)远程过程调用,它允许一台计算机程序远程调用另一台计算机的子程序,而不用去关心底层的网络通信细节,对我们来说是透明的。经常用于分布式网络通信中。
Hadoop的进程间交互都是通过RPC来进行的,比如Namenode与Datanode之间,Jobtracker与tasktracker之间等。
RPC相关知识
RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据。在OSI网络通信模型中,RPC跨越了传输层和应用层。RPC使得开发包括网络分布式多程序在内的应用程序更加容易。
RPC采用客户机/服务器模式。请求程序就是一个客户机,而服务器提供程序就是一个服务器。首先,客户机调用进程发送一个有进程参数的调用信息到服务进程,然后等待应答信息。
在服务器端,进程保持睡眠状态直到调用信息的到达为止,当一个调用信息到达,服务器获得进程参数,计算结果,发送答复信息给client。然后等待下一个调用信息,最后,客户端调用进程接收答复信息,获得进程结果,然后调用执行继续进行。
RPC的特点
透明性:远程调用其他机器上的程序,对用户来说就像是调用本地方法一样。
高性能:RPC server能够并发处理多个来自Client的请求(请求队列)
可控性:jdk中已经提供了一个RPC框架?RMI,但是该RPC框架过于重量级并且可控之处较少,所以Hadoop RPC实现了自定义的RPC框架。
Hadoop RPC通信
序列化层:Client与Server端通信传递的信息采用了Hadoop里提供的序列化类或自定义Writable类型。
函数调用层:Hadoop RPC通过动态代理以及Java反射机制实现函数调用。
网络传输层:Hadoop RPC采用了基于TCP/IP的socket机制。
服务器端框架层:RPC Server利用Java NIO以及采用了事件驱动的I/O模型,提高RPC Server的并发处理能力。
Hadoop的整个体系结构就是构建在RPC之上(org.apache.hadoop.ipc)
Hadoop RPC 设计技术
动态代理
反射
序列化
非阻塞的异步I/O(NIO)
动态代理
动态代理可以提供另一个对象的访问,同时隐藏实际对象的具体事实,代理对象对客户隐藏了实际对象。
动态代理可以对请求进行其他的一些处理,在不允许直接访问某些类,或需要对访问做一些特殊处理等,这时候可以考虑使用代理。
目前java开发包中提供了对动态代理的支持
相关的类与接口:
java.lang.reflect.Proxy --类java.lang.reflect.InvocationHandler --接口
动态代理类创建对象过程:
InvocationHandler handler = new InvocationHandlerImpl(...)Proxy.newInstance(...)
RPC几个重要的协议
ClientProtocol 是客户端(Filesystem )与NameNode 通信的接口
DatanodeProtocol 是DataNode与NameNode通信的接口
NamenodeProtocol 是SecondaryNameNode与NameNode通信的接口。
DF SClient是直接调用NameNode接口的对象。用户代码是通过DistributedFileSystem调用DFSClient对象,才能与NameNode打交道。
模拟Hadoop 客户端通过RPC调用namenode namenode要返回信息
程序实现
创建java程序
客户端和服务器端 创建接口HelloWorldService.java(协议)
public interface HelloWorldService extends VersionedProtocol{
static final long versionID = 1;
public String sayHello(String msg);
}
实现接口(服务端实现)
import java.io.IOException;
import org.apache.hadoop.ipc.ProtocolSignature;
public class HelloWorldServiceImpl implements HelloWorldService {
public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
return 1;
}
public ProtocolSignature getProtocolSignature(String protocol, long clientVersion, int clientMethodsHash)
throws IOException {
try {
return ProtocolSignature.getProtocolSignature(protocol, clientVersion);
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
return null;
}
public String sayHello(String msg) {
System.out.println(msg);
return "hello" + msg;
}
}
创建MyServer服务器端(开启服务)
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RPC.Server;
/**
* 服务器端
*/
public class MyServer {
public static void main(String[] args) {
try {
Configuration conf = new Configuration();
Server server = new RPC.Builder(conf)
//设置协议
.setProtocol(HelloWorldService.class)
//设置实现类
.setInstance(new HelloWorldServiceImpl())
//绑定地址
.setBindAddress("localhost")
//设置处理器个数
.setNumHandlers(2)
//设置端口号
.setPort(8888).build();
//build完之后,启动服务器
server.start();
} catch (Exception e) {
e.printStackTrace();
}
}
}
创建MyClient 客户端
import java.net.InetSocketAddress;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
/**
* 客户端
*/
public class MyClient {
public static void main(String[] args) {
try {
Configuration conf = new Configuration();
HelloWorldService proxy = RPC.getProxy(HelloWorldService.class, HelloWorldService.versionID, new InetSocketAddress("localhost",8888), conf);
String result = proxy.sayHello("world");
System.out.println(result);
} catch (Exception e) {
e.printStackTrace();
}
}
}
运行
1.启动服务器端
2.启动客户端
运行结果:
启动服务端

启动客户端
启动客户端之后,服务端的反馈

———————————————————————————————————————————————————————————-
Client客户端
package com.zhiyou.hadoop.rpc;
import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import com.zhiyou.hadoop.protocol.ClientNameNodeProtocol;
/**
* 客户端访问namenode要使用RPC框架
* hadoop里有个RPC类封装了底层的通信过程
* (netty)
* protocol:限制了客户端调用namenode的协议
* */
public class HadoopClient {
/**
* 返回实现这个接口的一个代理对象namenode
* 调用本地的方法就像调用远程一样
* 客户端发送一个请求(方法名getInfo()、参数"/luban")
* 然后客户端的代理对象拦截了这个请求,调用socket流发给服务端
* 然后服务端接到了这个请求,把方法名和参数拿出来
* 这样就知道调用哪个方法,然后在namenode调用这个实例对象返回
* 处理结果,最后通过网络socket发给代理对象
* */
public static void main(String[] args) throws IOException {
ClientNameNodeProtocol namenode = RPC.getProxy(ClientNameNodeProtocol.class, 1L,
new InetSocketAddress("localhost", 8888), new Configuration());
String info=namenode.getInfo("/luban");
System.out.println(info);
}
}
协议
package com.zhiyou.hadoop.protocol;
/**
* 客户端和namenode调用的通信接口
* */
public interface ClientNameNodeProtocol {
public static final long versionID=1L;
public String getInfo(String path);
}
server
package com.zhiyou.hadoop.server;
import com.zhiyou.hadoop.protocol.ClientNameNodeProtocol;
/**
* 提供业务服务
* 模拟hadoop客户端调用RPC调用namenode
* namenode 要返回信息
*这个类要实现一个接口,方便以后两端通信
* */
public class HadoopNameNode implements ClientNameNodeProtocol{
//模拟namenode的业务之一:查询元数据
//参数是client
public String getInfo(String path) {
return path+":1 -{BLK_1,BLK_2}";
}
}
启动服务
package com.zhiyou.hadoop.server;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RPC.Builder;
import org.apache.hadoop.ipc.RPC.Server;
import com.zhiyou.hadoop.protocol.ClientNameNodeProtocol;
import com.zhiyou.hadoop.protocol.UserLoginService;
/**
* 此 类 用来发布写好的服务
* */
public class StartService {
public static void main(String[] args) throws Exception, IOException {
Builder builder = new RPC.Builder(new Configuration());
builder.setBindAddress("localhost").
setPort(8888).
setProtocol(ClientNameNodeProtocol.class).
setInstance(new HadoopNameNode());
Server server = builder.build();
server.start();
}
}
使用hadoop的RPC模拟登录
通信接口,协议
package com.zhiyou.hadoop.protocol;
public interface UserLoginService {
public static final long versionID=1L; //读取版本号
public String login(String name,String password);
}
接口实现类
package com.zhiyou.hadoop.server;
import com.zhiyou.hadoop.protocol.UserLoginService;
public class UserLoginServiceImpl implements UserLoginService {
@Override
public String login(String name, String password) {
// action中会收到这个结果
return name + "login.....";
}
}
action
package com.zhiyou.hadoop.rpc;
import java.net.InetSocketAddress;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import com.zhiyou.hadoop.protocol.UserLoginService;
/**
* 使用hadoop的rpc模拟登录
* 此时action调用是框架的,service都在其他机器上
* */
public class UserLoginAction {
public static void main(String[] args) throws Exception {
UserLoginService service = RPC.getProxy(UserLoginService.class, 1L,
new InetSocketAddress("localhost", 9999),
new Configuration());
String login = service.login("zhangsan", "123456");
System.out.println(login);
}
}
启动服务
package com.zhiyou.hadoop.server;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RPC.Builder;
import org.apache.hadoop.ipc.RPC.Server;
import com.zhiyou.hadoop.protocol.ClientNameNodeProtocol;
import com.zhiyou.hadoop.protocol.UserLoginService;
/**
* 此 类 用来发布写好的服务
* */
public class StartService {
public static void main(String[] args) throws Exception, IOException {
Builder builder2 = new RPC.Builder(new Configuration());
builder2.setBindAddress("localhost")
.setPort(9999)
.setProtocol(UserLoginService.class)
.setInstance(new UserLoginServiceImpl());
Server server1 = builder2.build();
server1.start();
}
}
运行结果
启动服务端:
启动客户端:

客户端启动后服务端的反馈

