分享

hadoop公共模块RPC实现机理 - sxf

 风自向前 2010-08-03
该模块儿是整个hadoop平台命令类协议通讯的基础,Hadoop平台中所有的协议调用都是通过该套机制进行实现。
 
术语解释:
远程进程调用
client调用远程Server中某实例的方法。
具体实现:
远程过程调用,一定通过网络进行方法参数以及返回值信息传输,该模块儿主要采用通用的网络Server设计实现方式,利用Socket构建Server服务器,并在其上构造一个具体业务实例,client将方法调用参数按照一定格式进行序列化,通过网络传输到Server端,Server解析用户请求调用相应的实例方法,将返回值或者捕获的异常通过网络反馈给client。
整体涉及到的主要类为:
Org.apache.hadoop.ipc.Server以及其内部实现类
org.apache.hadoop.ipc.Client以及其内部实现类
org.apache.hadoop.ipc.Rpc$Server
 
客户端(client)具体实现:
 
客户端主要通过proxy模式,利用java自身动态代理技术,Proxy.newProxyInstance
public static Object newProxyInstance(ClassLoader loader,
  Class<?>[] interfaces,
  InvocationHandler h)
该函数主要的两个参数Class<?>[] interfaces, InvocationHandler h
用户调用该函数生成一个Object实例,该实例是一个实现了interfaces接口的具体实例,用户通常将返回的Object转化为具体的interfaces进行使用,当用户调用该实例(实现了interfaces接口)某个具体方法时,系统就将请求转发给InvocationHandler h这个参数对应的实例。
public interface InvocationHandler
{
    public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable;
}
最终用户的请求被push到InvocationHandler h这个参数的invoke方法,系统将method以及args这两个参数进行序列化,通过网络传输到server端。
 
主要流程如下:其中Invoker是一个实现了InvocationHandler 接口的类
1.生成代理
 
 
生成远程代理的主要关键步骤就是java的动态代理技术 Proxy.newProxyInstance.
 
2.远程实例方法调用:
 
 
最终用户的请求被push到InvocationHandler h这个参数的invoke方法,系统将method以及args这两个参数进行序列化,通过网络传输到server端(实例化Invoker的同时会实例化一个Client类实例,这个client主要负责将调用方法以及参数序列化以后,通过网络传输到Server端)。
远程调用的主要关键就是Invocation实现了Writable接口,Invocation在write(DataOutput out)函数中将调用的methodName写入到out,将调用方法的参数个数写入out ,同时逐个将参数的className写入out,最后将所有参数逐个写入out,这也就决定了通过RPC实现调用的方法中的参数要么是简单类型,要么是String,要么是实现了Writable接口的类(参数自己知道如何序列化到stream),要么是数组(数组的元素也必须为简单类型,String,实现了Writable接口的类)。
Invocation序列化参数的实现是通过如下函数实现的:
Org.apache.hadoop.io.ObjectWritable.writeObject
(DataOutput out, Object instance,
Class declaredClass, Configuration conf);
{
if (instance == null)
{ // null
instance = new NullInstance(declaredClass, conf);
declaredClass = Writable.class;
}
//首先写入参数名称写入out
UTF8.writeString(out, declaredClass.getName()); // always write declared
//如果是数组
if (declaredClass.isArray())
{ // array
int length = Array.getLength(instance);
//首先写入数组元素个数
out.writeInt(length);
//逐步序列化数组中的元素
for (int i = 0; i < length; i++)
{
writeObject(out, Array.get(instance, i), declaredClass
.getComponentType(), conf);
}
 
}
//
else if (declaredClass == String.class)
{ // String
UTF8.writeString(out, (String) instance);
 
}
else if (declaredClass.isPrimitive())
{ // primitive type
 
if (declaredClass == Boolean.TYPE)
{ // boolean
out.writeBoolean(((Boolean) instance).booleanValue());
}
else if (declaredClass == Character.TYPE)
{ // char
out.writeChar(((Character) instance).charValue());
}
else if (declaredClass == Byte.TYPE)
{ // byte
out.writeByte(((Byte) instance).byteValue());
}
else if (declaredClass == Short.TYPE)
{ // short
out.writeShort(((Short) instance).shortValue());
}
else if (declaredClass == Integer.TYPE)
{ // int
out.writeInt(((Integer) instance).intValue());
}
else if (declaredClass == Long.TYPE)
{ // long
out.writeLong(((Long) instance).longValue());
}
else if (declaredClass == Float.TYPE)
{ // float
out.writeFloat(((Float) instance).floatValue());
}
else if (declaredClass == Double.TYPE)
{ // double
out.writeDouble(((Double) instance).doubleValue());
}
else if (declaredClass == Void.TYPE)
{ // void
}
else
{
throw new IllegalArgumentException("Not a primitive: "
+ declaredClass);
}
}
else if (declaredClass.isEnum())
{ // enum
UTF8.writeString(out, ((Enum) instance).name());
}
//其他元素通过自身实现的writable接口功能实现序列化
else if (Writable.class.isAssignableFrom(declaredClass))
{ // Writable
UTF8.writeString(out, instance.getClass().getName());
((Writable) instance).write(out);
 
}
else
{
throw new IOException("Can't write: " + instance + " as "
+ declaredClass);
}
}
 
 
服务端(Server,具体调用实例所在)具体实现:
1.具体相关类
 
 
最底层的抽象类Server是整个网络通讯的核心,完成Listener,Responser以及Handler实例的初始化,并控制所有功能模块儿服务的启动。
Listener主要负责Socket的监听以及Connection的建立,同时监控ClientSocket的数据可读事件,通知Connection进行processData,收到完成请求包以后,封装为一个Call对象(包含Connection对象,从网络流中读取的参数信息,调用方法信息),将其放入队列。
Handler主要负责从请求队列中取出数据逐个进行处理,最终调用Server抽奖类的call方法,目前抽象类Server实现了一个具体的实现类,名称也是Server。Server的call方法中根据方法名以及参数构建method对象,并在实际服务的实例对象上进行方法调用。方法调用返回值或异常对象被序列化到一个ByteBuffer,由Responser.doRespond方法进行发送数据。
Responser的主要作用是发送数据,如果数据量比较大一次发送不出去,就监听Write事件,一直Select是否可写,保证数据发送完整.
2.Server端相关调用流程图如下:
 
 
本文来自CSDN博客,转载请标明出处:http://blog.csdn.net/sxf_824/archive/2009/11/20/4842153.aspx
本文来自CSDN博客,转载请标明出处:http://blog.csdn.net/sxf_824/archive/2009/11/20/4842153.aspx

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多