该模块儿是整个hadoop平台命令类协议通讯的基础,Hadoop平台中所有的协议调用都是通过该套机制进行实现。
术语解释: 远程进程调用 client调用远程Server中某实例的方法。 。 具体实现: 远程过程调用,一定通过网络进行方法参数以及返回值信息传输,该模块儿主要采用通用的网络Server设计实现方式,利用Socket构建Server服务器,并在其上构造一个具体业务实例,client将方法调用参数按照一定格式进行序列化,通过网络传输到Server端,Server解析用户请求调用相应的实例方法,将返回值或者捕获的异常通过网络反馈给client。 整体涉及到的主要类为: Org.apache.hadoop.ipc.Server以及其内部实现类 org.apache.hadoop.ipc.Client以及其内部实现类 org.apache.hadoop.ipc.Rpc$Server
客户端(client)具体实现:
客户端主要通过proxy模式,利用java自身动态代理技术,Proxy.newProxyInstance public static Object newProxyInstance(ClassLoader loader, Class<?>[] interfaces, InvocationHandler h) 该函数主要的两个参数Class<?>[] interfaces, InvocationHandler h 用户调用该函数生成一个Object实例,该实例是一个实现了interfaces接口的具体实例,用户通常将返回的Object转化为具体的interfaces进行使用,当用户调用该实例(实现了interfaces接口)某个具体方法时,系统就将请求转发给InvocationHandler h这个参数对应的实例。 public interface InvocationHandler { public Object invoke(Object proxy, Method method, Object[] args) throws Throwable; } 最终用户的请求被push到InvocationHandler h这个参数的invoke方法,系统将method以及args这两个参数进行序列化,通过网络传输到server端。
主要流程如下:其中Invoker是一个实现了InvocationHandler 接口的类 1.生成代理
生成远程代理的主要关键步骤就是java的动态代理技术 Proxy.newProxyInstance.
2.远程实例方法调用:
最终用户的请求被push到InvocationHandler h这个参数的invoke方法,系统将method以及args这两个参数进行序列化,通过网络传输到server端(实例化Invoker的同时会实例化一个Client类实例,这个client主要负责将调用方法以及参数序列化以后,通过网络传输到Server端)。 远程调用的主要关键就是Invocation实现了Writable接口,Invocation在write(DataOutput out)函数中将调用的methodName写入到out,将调用方法的参数个数写入out ,同时逐个将参数的className写入out,最后将所有参数逐个写入out,这也就决定了通过RPC实现调用的方法中的参数要么是简单类型,要么是String,要么是实现了Writable接口的类(参数自己知道如何序列化到stream),要么是数组(数组的元素也必须为简单类型,String,实现了Writable接口的类)。 Invocation序列化参数的实现是通过如下函数实现的: Org.apache.hadoop.io.ObjectWritable.writeObject (DataOutput out, Object instance, Class declaredClass, Configuration conf); { if (instance == null) { // null instance = new NullInstance(declaredClass, conf); declaredClass = Writable.class; } //首先写入参数名称写入out UTF8.writeString(out, declaredClass.getName()); // always write declared //如果是数组 if (declaredClass.isArray()) { // array int length = Array.getLength(instance); //首先写入数组元素个数 out.writeInt(length); //逐步序列化数组中的元素 for (int i = 0; i < length; i++) { writeObject(out, Array.get(instance, i), declaredClass .getComponentType(), conf); }
} // else if (declaredClass == String.class) { // String UTF8.writeString(out, (String) instance);
} else if (declaredClass.isPrimitive()) { // primitive type
if (declaredClass == Boolean.TYPE) { // boolean out.writeBoolean(((Boolean) instance).booleanValue()); } else if (declaredClass == Character.TYPE) { // char out.writeChar(((Character) instance).charValue()); } else if (declaredClass == Byte.TYPE) { // byte out.writeByte(((Byte) instance).byteValue()); } else if (declaredClass == Short.TYPE) { // short out.writeShort(((Short) instance).shortValue()); } else if (declaredClass == Integer.TYPE) { // int out.writeInt(((Integer) instance).intValue()); } else if (declaredClass == Long.TYPE) { // long out.writeLong(((Long) instance).longValue()); } else if (declaredClass == Float.TYPE) { // float out.writeFloat(((Float) instance).floatValue()); } else if (declaredClass == Double.TYPE) { // double out.writeDouble(((Double) instance).doubleValue()); } else if (declaredClass == Void.TYPE) { // void } else { throw new IllegalArgumentException("Not a primitive: " + declaredClass); } } else if (declaredClass.isEnum()) { // enum UTF8.writeString(out, ((Enum) instance).name()); } //其他元素通过自身实现的writable接口功能实现序列化 else if (Writable.class.isAssignableFrom(declaredClass)) { // Writable UTF8.writeString(out, instance.getClass().getName()); ((Writable) instance).write(out);
} else { throw new IOException("Can't write: " + instance + " as " + declaredClass); } }
服务端(Server,具体调用实例所在)具体实现: 1.具体相关类
最底层的抽象类Server是整个网络通讯的核心,完成Listener,Responser以及Handler实例的初始化,并控制所有功能模块儿服务的启动。 Listener主要负责Socket的监听以及Connection的建立,同时监控ClientSocket的数据可读事件,通知Connection进行processData,收到完成请求包以后,封装为一个Call对象(包含Connection对象,从网络流中读取的参数信息,调用方法信息),将其放入队列。 Handler主要负责从请求队列中取出数据逐个进行处理,最终调用Server抽奖类的call方法,目前抽象类Server实现了一个具体的实现类,名称也是Server。Server的call方法中根据方法名以及参数构建method对象,并在实际服务的实例对象上进行方法调用。方法调用返回值或异常对象被序列化到一个ByteBuffer,由Responser.doRespond方法进行发送数据。 Responser的主要作用是发送数据,如果数据量比较大一次发送不出去,就监听Write事件,一直Select是否可写,保证数据发送完整. 2.Server端相关调用流程图如下: |
|
来自: mumuxd > 《hadoop相关》