为了不遗忘和可以速查源码,准备重新读一遍Hadoop的MapReduce部分的源码,记录下来,尽量详细点。如要转载,请标明出处。
写MapReduce程序首先接触的是Job类,Job类是管理一个集群作业的类,包含了一个作业的所有信息和向集群提交作业的方法。

如图所示,它有以上一些方法,我们写程序是调用waitForCompletion()方法,方法实现如下:
- public boolean waitForCompletion(boolean verbose
- ) throws IOException, InterruptedException,
- ClassNotFoundException {
- if (state == JobState.DEFINE) {
- submit();
- }
- if (verbose) {
- jobClient.monitorAndPrintJob(conf, info);
- } else {
- info.waitForCompletion();
- }
- return isSuccessful();
- }
它调用了submit向集群提交作业,下面看下submit()方法:
- public void submit() throws IOException, InterruptedException,
- ClassNotFoundException {
- ensureState(JobState.DEFINE);
- 建立新的API,检查兼容性
- setUseNewAPI();
- info = jobClient.submitJobInternal(conf);
- state = JobState.RUNNING;
- }
jobClient是在初始化时候建立的。
- public Job(Configuration conf) throws IOException {
- super(conf, null);
- jobClient = new JobClient((JobConf) getConfiguration());
- }
JobClient类 建立了一个代理,用于连接JobTracker(集群上的master结点),
- public JobClient(JobConf conf) throws IOException {
- setConf(conf);
- init(conf);
- }
-
-
-
-
-
- public void init(JobConf conf) throws IOException {
- String tracker = conf.get("mapred.job.tracker", "local");
- if ("local".equals(tracker)) {
- this.jobSubmitClient = new LocalJobRunner(conf);
- } else {
- this.jobSubmitClient = createRPCProxy(JobTracker.getAddress(conf), conf);
- }
- }
这个代理会检查mapred.job.tracker 这个属性有没有建立,默认值是local,如果建立了,则建立一个连接JobTracker的代理。这个代理负责上传作业的配置和作业内容到集群中。
- private JobSubmissionProtocol createRPCProxy(InetSocketAddress addr,
- Configuration conf) throws IOException {
- return (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class,
- JobSubmissionProtocol.versionID, addr, getUGI(conf), conf,
- NetUtils.getSocketFactory(conf, JobSubmissionProtocol.class));
- }
发现他实现了JobSubmissionProtocol接口的一个对象
- public static VersionedProtocol getProxy(Class<?> protocol,
- long clientVersion, InetSocketAddress addr, UserGroupInformation ticket,
- Configuration conf, SocketFactory factory) throws IOException {
-
- VersionedProtocol proxy =
- (VersionedProtocol) Proxy.newProxyInstance(
- protocol.getClassLoader(), new Class[] { protocol },
- new Invoker(addr, ticket, conf, factory));
- long serverVersion = proxy.getProtocolVersion(protocol.getName(),
- clientVersion);
- if (serverVersion == clientVersion) {
- return proxy;
- } else {
- throw new VersionMismatch(protocol.getName(), clientVersion,
- serverVersion);
- }
- }
总之,Job类使用了一个实现了JobSubmissionProtocol接口的一个代理,这个代理对象可以用来和集群通信,job类的一些方法也可以用来帮助我们对集群和任务的进展情况进行查看。
|