有了Client,有了Server,那整个过程怎么运行起来?
先说一下基本原理:
- 1. 首先客户端和服务器端之间要有一个协议,这里的协议就是以java接口类的方式暴露出来的
- 2. 虽然Client类和Server类之间已经具有通信的能力,也有了协议,那么一个真正的客户端要调用服务器端rpc调用的实现,只需要解决参数及具体的调用实现两个问题即可
- 3. 客户端要做的,就是要将参数(这个一般称为存根)通过网络传递到服务器端。这个自然而然想到使用代理模式,因为Client已经具备网络通信的能力,只要通过代理,实现获取参数进行传输即可,为什么不在Client这里实现参数的获取,如果这样的话,就违反了单一职责的原则,且扩展性不行,总不能一个客户端的调用实现一个特定的Client类吧。因此,将Client的功能单一独立出来,只负责将参数通过网络传递到服务器端
- 4. 服务器要做的工作,只需要进行调用的真正的实现即可,当然了, 最后需要能够返回正确的结果。
上面说的这些,都全部在hadoop的这个RPC里进行了实现。
客户端的主要代理实现方法如下:
public static <T> ProtocolProxy<T> getProtocolProxy(Class<T> protocol,
long clientVersion,
InetSocketAddress addr,
UserGroupInformation ticket,
Configuration conf,
SocketFactory factory,
int rpcTimeout) throws IOException {
if (UserGroupInformation.isSecurityEnabled()) {
SaslRpcServer.init(conf);
}
return getProtocolEngine(protocol,conf).getProxy(protocol,
clientVersion, addr, ticket, conf, factory, rpcTimeout);
}
其中是调用RpcEngine的下面这个接口方法来进行实现的:
public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
InetSocketAddress addr, UserGroupInformation ticket,
Configuration conf, SocketFactory factory,
int rpcTimeout)
throws IOException
对应的,可以查看一个具体实现的代码,WritableRpcEngine类的实现:
public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
InetSocketAddress addr, UserGroupInformation ticket,
Configuration conf, SocketFactory factory,
int rpcTimeout)
throws IOException {
T proxy = (T) Proxy.newProxyInstance(protocol.getClassLoader(),
new Class[] { protocol }, new Invoker(protocol, addr, ticket, conf,
factory, rpcTimeout));
return new ProtocolProxy<T>(protocol, proxy, true);
}
真正的代理处理在InVoker类里实现(关于JDK的动态代理,可参看
http://jimmee.iteye.com/admin/blogs/776820)
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {
long startTime = 0;
if (LOG.isDebugEnabled()) {
startTime = System.currentTimeMillis();
}
ObjectWritable value = (ObjectWritable)
// 这里取得要调用的方法,参数列表,之后通过Client对象传递给服务器端
client.call(new Invocation(method, args), remoteId);
if (LOG.isDebugEnabled()) {
long callTime = System.currentTimeMillis() - startTime;
LOG.debug("Call: " + method.getName() + " " + callTime);
}
return value.get();
}
服务器端真正的实现,也在RpcEngine的一个具体实现里:
public Writable call(Class<?> protocol, Writable param, long receivedTime)
throws IOException {
….
Invocation call = (Invocation)param;
if (verbose) log("Call: " + call);
Method method = protocol.getMethod(call.getMethodName(),
call.getParameterClasses());
method.setAccessible(true);
// Verify rpc version
….
//Verify protocol version.
……
long startTime = System.currentTimeMillis();
// 真正的调用
Object value = method.invoke(instance, call.getParameters());
int processingTime = (int) (System.currentTimeMillis() - startTime);
int qTime = (int) (startTime-receivedTime);
if (LOG.isDebugEnabled()) {
LOG.debug("Served: " + call.getMethodName() +
" queueTime= " + qTime +
" procesingTime= " + processingTime);
}
rpcMetrics.addRpcQueueTime(qTime);
rpcMetrics.addRpcProcessingTime(processingTime);
rpcDetailedMetrics.addProcessingTime(call.getMethodName(),
processingTime);
if (verbose) log("Return: "+value);
return new ObjectWritable(method.getReturnType(), value);
…..
}
一个简单的例子:
客户端和服务器端的协议及实现:
package cn.edu.jimmee;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.VersionedProtocol;
/**
* rpc的协议接口
* @author jimmee
*/
public interface RpcProtocol extends VersionedProtocol {
public BooleanWritable printMsg(IntWritable id, Text msg);
}
package cn.edu.jimmee;
import java.io.IOException;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
/**
* rpc的协议实现接口
* @author jimmee
*/
public class RpcProtocolImpl implements RpcProtocol {
@Override
public BooleanWritable printMsg(IntWritable id, Text msg) {
System.out.println("id=" + id.get() + ", msg=" + msg.toString());
if (Math.random() < 0.5) {
return new BooleanWritable(true);
} else {
return new BooleanWritable(false);
}
}
@Override
public long getProtocolVersion(String protocol,
long clientVersion)
throws IOException {
return 0;
}
}
服务器代码:
package cn.edu.jimmee;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RPC.Server;
/**
* rpc的server
* @author jimmee
*/
public class RpcServer {
public static void main(String[] args) throws IOException {
RpcProtocol instance = new RpcProtocolImpl();
Configuration conf = new Configuration();
Server server = RPC.getServer(instance, "127.0.0.1", 7777, conf);
server.start();
}
}
客户端的代码:
package cn.edu.jimmee;
import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RPC;
/**
* rpc的client端的实现
* @author jimmee
*/
public class RpcClient {
public static void main(String[] args) throws IOException {
Configuration conf = new Configuration();
RpcProtocol rpcClientImpl = (RpcProtocol) RPC.getProxy(RpcProtocol.class, 0, new InetSocketAddress("127.0.0.1", 7777), conf);
for (int i = 0; i < 10; i++) {
System.out.println(rpcClientImpl.printMsg(new IntWritable(i), new Text("hello" + i)));
}
}
}
分享到:
相关推荐
Hadoop_RPC详细分析.doc,这是你多关注的
hadoop rpc 云计算 hdfs mapreduce
java模拟hadoop的RPC(Remote Procedure Call)通讯,连接和心跳
hadoop rpc实例,
Hadoop_RPCDemo Hadoop原始解析之RPC协议详见csdn:
1.java接口操作Hadoop文件系统(文件上传下载删除创建......2.RPC远程过程调用的java代码实现,便于理解Hadoop的RPC协议,具体使用方法可参考我的博客https://blog.csdn.net/qq_34233510/article/details/88142507
Hadoop rpc源码是从Hadoop分离出的ipc,去掉了认证部分,附录使用文档.使用前请add lib包commons-logging-*.*.*.jar(我用的是1.0.4)和log4j-*.*.*.jar(我的1.2.13) 相关blog post: ...
使用Hadoop的RPC机制创建一个协议接口、通信服务端、通信客户端程序 目标:通过该任务,理解分布式系统中远程过程调用协议,掌握分布式系统中客户机与服务器的通信机制。
java操作hadoop的RPC源码,附带所需全部jar包,欢迎下载学习。
rpc架构与hadoop分享
Hadoop自己的Rpc框架使用Demo。可以在自己的项目中用Hadoop的Rpc框架了。
NULL 博文链接:https://wmwork2010.iteye.com/blog/632016
本ppt主要讲解yarn的基本架构,工作流程,基础库以及程序设计方法 容错等
传智播客吴超老师RPC小例子
Hadoop里的RPC机制过程,代码示例rpc调用过程
NULL 博文链接:https://zqhxuyuan.iteye.com/blog/1879292
RPC(RemoteProcedureCall)——远程过程调用,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。RPCServer实现了一种抽象的RPC服务,同时提供Call队列。RPCServer作为服务提供者由两个...
这些天一直奔波于长沙和武汉之间,忙着腾讯的笔试、面试,以至于对hadoopRPC(RemoteProcedureCallProtocol,远程过程调用协议,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。...
学习hadoop源代码,RPC部分.pdf
hadoop_rpc(可能是开源的) 支持(将开源) 支持,线程安全,比正式客户更友好,性能更高。 百度使用的各种协议: , streaming_rpc ,hulu_pbrpc, sofa_pbrpc ,nova_pbrpc,public_pbrpc,ubrpc和基于nshead...