hadoop 学习
Hadoop 的文件系统,最重要是 FileSystem 类,以及它的两个子类 LocalFileSystem 和 DistributedFileSystem。 这里先分析 FileSystem。
抽象类 FileSystem,提高了一系列对文件/目录操作的接口,还有一些辅助方法。分别说明一下: 1. open,create,delete,rename等,非abstract,部分返回 FSDataOutputStream,作为流进行处理。 2. openRaw,createRaw,renameRaw,deleteRaw等,abstract,部分返回 FSInputStream,可以随机访问。 3. lock,release,copyFromLocalFile,moveFromLocalFile,copyToLocalFile 等abstract method,提供便利作用,从方法命名可以看出作用。 特别说明,Hadoop的文件系统,每个文件都有一个checksum,一个crc文件。因此FileSystem里面的部分代码对此进行了特别的处理,比如 rename。 LocalFileSystem 和 DistributedFileSystem,理应对用户透明,这里不多做分析,和 FSDataInputStream,FSInputStream 结合一起说明一下。 查看两个子类的 getFileCacheHints 方法,可以看到 LocalFileSystem 是使用‘localhost‘来命名,这里暂且估计两个FileSystem都是通过网络进行数据通讯,一个是Internet,一个是Intranet。 LocalFileSystem 里面有两个内部类 LocalFSFileInputStream和LocalFSFileOutputStream,查看代码可以看到它是使用 FileChannel进行操作的。另外 lock和release 两个方法使用了TreeMap来保存文件和对应的锁。 DistributedFileSystem 代码量少于 LocalFileSystem,但是更加复杂,它里面使用了 DFSClient 来进行分布式文件系统的操作: public DistributedFileSystem(InetSocketAddress namenode, Configuration conf) throws IOException { super(conf); this.dfs = new DFSClient(namenode, conf); this.name = namenode.getHostName() + ":" + namenode.getPort(); } DFSClient 类接收一个InetSocketAddress 和Configuration 作为输入,对网络传输细节进行了封装。DistributedFileSystem中绝大多数方法都是调用DFSClient进行处理,它只是一个 Warpper。下面着重分析DFSClient。 DFSClient中,主要使用RPC来进行网络的通讯,而不是直接在内部使用Socket。如果要详细了解传输细节,可以查看 org.apache.hadoop.ipc 这个包里面的3个Class。 DFSClient 中的路径,基本上都是UTF8类型,而非String,在DistributedFileSystem中,通过getPath和getDFSPath来转换,这样做可以保证路径格式的标准和数据传输的一致性。 DFSClient 中的大多数方法,也是直接委托ClientProtocol类型的namenode来执行,这里主要分析其它方法。 LeaseChecker 内部类。一个守护线程,定期对namenode进行renewLease操作,注释说明: Client programs can cause stateful changes in the NameNode that affect other clients. A client may obtain a file and neither abandon nor complete it. A client might hold a series of locks that prevent other clients from proceeding. Clearly, it would be bad if a client held a bunch of locks that it never gave up. This can happen easily if the client dies unexpectedly. So, the NameNode will revoke the locks and live file-creates for clients that it thinks have died. A client tells the NameNode that it is still alive by periodically calling renewLease(). If a certain amount of time passes since the last call to renewLease(), the NameNode assumes the client has died. 作用是对client进行心跳监测,若client挂掉了,执行解锁操作。 DFSInputStream 和 DFSOutputStream,比LocalFileSystem里面的更为复杂,也是通过 ClientProtocol 进行操作,里面使用到了 org.apache.hadoop.dfs 包中的数据结构,如DataNode,Block等,这里不对这些细节进行分析。 对FileSystem的分析(1)到此结束,个人感觉它的封装还是做的不错的,从Nutch项目分离出来后,比原先更为清晰。 下面就接着进行MapReduce的第二部分分析,从MapReduce如何进行分布式 ############################################################################### 之前的MapReduce Demo只能在一台机器上运行,现在是时候让它分布式运行了。在对MapReduce的运行流程和FileSystem进行了简单研究之后,现在尝试从配置着手,看看怎样让Hadoop在两台机器上面同时运行MapReduce。 首先看回这里 String tracker = conf.get("mapred.job.tracker", "local"); if ("local".equals(tracker)) { this.jobSubmitClient = new LocalJobRunner(conf); } else { this.jobSubmitClient = (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class, JobTracker.getAddress(conf), conf); } 当tracker地址不为local,则tracker为Remote Client的 JobTracker 类,这里重点分析。 JobTracker有一个main函数,注释显示它仅仅用于调试,正常情况是作为DFS Namenode进程的一部分来运行。不过这里我们可以先从它着手开始分析。 tracker = new JobTracker(conf); //构造 构造函数先获取一堆常量的值,然后清空‘systemDir‘,接着启动RPC服务器。 InetSocketAddress addr = getAddress(conf); this.localMachine = addr.getHostName(); this.port = addr.getPort(); this.interTrackerServer = RPC.getServer(this, addr.getPort(), 10, false, conf); this.interTrackerServer.start(); 启动TrackInfoServer: this.infoPort = conf.getInt("mapred.job.tracker.info.port", 50030); this.infoServer = new JobTrackerInfoServer(this, infoPort); this.infoServer.start(); TrackInfoServer 提供了通过HTTP方式获取JobTracker信息的方式,可以方便用于监测工作任务的进度。 启动三个守护线程: new Thread(this.expireTrackers).start(); //Used to expire TaskTrackers that have gone down new Thread(this.retireJobs).start(); //Used to remove old finished Jobs that have been around for too long new Thread(this.initJobs).start(); //Used to init new jobs that have just been created 三个线程的用处已经注释,这里不作分析。下面开始分析 JobTracker.submitJob() 之前已经分析过 LocalJobRunner.submitJob(),它实例化内部类Job,在里面实现MapReduce流程。JobTracker就复杂一些,它实例化 JobInProgress,然后将这个Job提交到队列: JobInProgress job = new JobInProgress(jobFile, this, this.conf); synchronized (jobs) { synchronized (jobsByArrival) { synchronized (jobInitQueue) { jobs.put(job.getProfile().getJobId(), job); jobsByArrival.add(job); jobInitQueue.add(job); jobInitQueue.notifyAll(); } } } 此时RetireJobs线程开始处理超时和出错的Job,JobInitThread线程初始化工作任务: job.initTasks(); 开始分析 JobInProgress 在构造函数中,Tracker从发起端的DFS获取任务文件(xml和jar),然后保存到本地目录下面 JobConf default_job_conf = new JobConf(default_conf); this.localJobFile = default_job_conf.getLocalFile(JobTracker.SUBDIR, jobid + ".xml"); this.localJarFile = default_job_conf.getLocalFile(JobTracker.SUBDIR, jobid + ".jar"); FileSystem fs = FileSystem.get(default_conf); fs.copyToLocalFile(new File(jobFile), localJobFile); conf = new JobConf(localJobFile); this.profile = new JobProfile(conf.getUser(), jobid, jobFile, url, conf.getJobName()); String jarFile = conf.getJar(); if (jarFile != null) { fs.copyToLocalFile(new File(jarFile), localJarFile); conf.setJar(localJarFile.getCanonicalPath()); } 这里要注意jarFile,JobConf的构造函数: public JobConf(Configuration conf, Class aClass) { this(conf); String jar = findContainingJar(aClass); if (jar != null) { setJar(jar); } } 如果 aClass 是在一个jar里面,那么setJar(jar);就会被执行,这个jar会被copy到 LocalJobRunner 或是 JobTracker 的工作目录下面。所以这里有一个原则: 将要执行的MapReduce操作的所有class打包到一个jar中,这样才能执行分布式的MapReduce计算。 再看 JobInProgress.initTasks() 先从Jar中加载InputFormat String ifClassName = jd.get("mapred.input.format.class"); InputFormat inputFormat; if (ifClassName != null && localJarFile != null) { try { ClassLoader loader = new URLClassLoader(new URL[]{ localJarFile.toURL() }); Class inputFormatClass = loader.loadClass(ifClassName); inputFormat = (InputFormat)inputFormatClass.newInstance(); } catch (Exception e) { throw new IOException(e.toString()); } } else { inputFormat = jd.getInputFormat(); } 接下来对文件块的大小进行排序 创建对应的Map任务 this.numMapTasks = splits.length; // create a map task for each split this.maps = new TaskInProgress[numMapTasks]; for (int i = 0; i < numMapTasks; i++) { maps = new TaskInProgress(jobFile, splits, jobtracker, conf, this); } 创建Reduce任务 this.reduces = new TaskInProgress[numReduceTasks]; for (int i = 0; i < numReduceTasks; i++) { reduces = new TaskInProgress(jobFile, maps, i, jobtracker, conf, this); } 最后对于每Split的信息进行缓存,并且创建状态类 for (int i = 0; i < maps.length; i++) { String hints[][] = fs.getFileCacheHints(splits.getFile(), splits.getStart(), splits.getLength()); cachedHints.put(maps.getTIPId(), hints); } this.status = new JobStatus(status.getJobId(), 0.0f, 0.0f, JobStatus.RUNNING); 现在轮到 TaskInProgress,它将Job里面的Map和Reduce操作进行了封装,但是JobInProgress.initTasks()仅仅对task进行了初始化,并没有执行Task,经过一番跟踪,发现Task的执行,是由 TaskTracker 来处理。 TaskTracker,实现了TaskUmbilicalProtocol接口。在之前的文章中,LocalJobRunner的内部类Job也实现了这个接口,这里对比一下: 接口 JobSubmissionProtocol: LocalJobRunner <---> JobTracker 接口 TaskUmbilicalProtocol: LocalJobRunner.Job <---> TaskTracker 下面对TaskTracker进行分析,首先也是从main入口开始。 TaskTracker实现了Runnable,main实例化TaskTracker对象,然后执行run()方法。 在构造函数中,主要进行初始化 this.mapOutputFile = new MapOutputFile(); this.mapOutputFile.setConf(conf); initialize(); initialize()里面,初始化一些变量值 ,然后初始化RPC服务器: while (true) { try { this.taskReportServer = RPC.getServer(this, this.taskReportPort, maxCurrentTasks, false, this.fConf); this.taskReportServer.start(); break; } catch (BindException e) { LOG.info("Could not open report server at " + this.taskReportPort + ", trying new port"); this.taskReportPort++; } } while (true) { try { this.mapOutputServer = new MapOutputServer(mapOutputPort, maxCurrentTasks); this.mapOutputServer.start(); break; } catch (BindException e) { LOG.info("Could not open mapoutput server at " + this.mapOutputPort + ", trying new port"); this.mapOutputPort++; } } mapOutputServer使用一个循环来尝试各个端口绑定。 最后一句 this.jobClient = (InterTrackerProtocol) RPC.getProxy(InterTrackerProtocol.class, jobTrackAddr, this.fConf); 这里有一个新的接口InterTrackerProtocol,是TaskTracker和中央JobTracker通讯用的协议。通过这个接口, TaskTracker可以用来执行JobTracker中的Task了。接下来分析TaskServer的主流程,run()函数。 run()中, 有两个while循环。在内部while循环里面,执行 offerService() 方法。它里面也是一个while循环,开始几段代码用于JobTracker的心跳监测。接下来,它通过协议接口调用JobTracker,获取Task并执行: if (mapTotal < maxCurrentTasks || reduceTotal < maxCurrentTasks) { Task t = jobClient.pollForNewTask(taskTrackerName); if (t != null) { TaskInProgress tip = new TaskInProgress(t, this.fConf); synchronized (this) { tasks.put(t.getTaskId(), tip); if (t.isMapTask()) { mapTotal++; } else { reduceTotal++; } runningTasks.put(t.getTaskId(), tip); } tip.launchTask(); } } tip.launchTask(); 开始执行这个Task,在方法内部: this.runner = task.createRunner(TaskTracker.this); this.runner.start(); Task 有两个子类 MapTask和ReduceTask,它们的createRunner()方法都会创建一个TaskRunner的子类,TaskRunner继承Thread,run()方法中: String sep = System.getProperty("path.separator"); File workDir = new File(new File(t.getJobFile()).getParent(), "work"); workDir.mkdirs(); StringBuffer classPath = new StringBuffer(); // start with same classpath as parent process classPath.append(System.getProperty("java.class.path")); classPath.append(sep); JobConf job = new JobConf(t.getJobFile()); String jar = job.getJar(); if (jar != null) { // if jar exists, it into workDir unJar(new File(jar), workDir); File[] libs = new File(workDir, "lib").listFiles(); if (libs != null) { for (int i = 0; i < libs.length; i++) { classPath.append(sep); // add libs from jar to classpath classPath.append(libs); } } classPath.append(sep); classPath.append(new File(workDir, "classes")); classPath.append(sep); classPath.append(workDir); } 获取工作目录,获取classpath。然后解压工作任务的jar包。 // Build exec child jmv args. Vector vargs = new Vector(8); File jvm = // use same jvm as parent new File(new File(System.getProperty("java.home"), "bin"), "java"); vargs.add(jvm.toString()); String javaOpts = handleDeprecatedHeapSize( job.get("mapred.child.java.opts", "-Xmx200m"), job.get("mapred.child.heap.size")); javaOpts = replaceAll(javaOpts, "@taskid@", t.getTaskId()); int port = job.getInt("mapred.task.tracker.report.port", 50050) + 1; javaOpts = replaceAll(javaOpts, "@port@", Integer.toString(port)); String [] javaOptsSplit = javaOpts.split(" "); for (int i = 0; i < javaOptsSplit.length; i++) { vargs.add(javaOptsSplit); } // Add classpath. vargs.add("-classpath"); vargs.add(classPath.toString()); // Add main class and its arguments vargs.add(TaskTracker.Child.class.getName()); // main of Child vargs.add(tracker.taskReportPort + ""); // pass umbilical port vargs.add(t.getTaskId()); // pass task identifier // Run java runChild((String[])vargs.toArray(new String[0]), workDir); 这里是构造启动Java进程的classpath和其它vm参数,最后在 runChild 中开一个子进程来执行这个Task。感觉够复杂的。 最后分析TaskTracker的内部类Child。它就是上面子进程执行的类。在main函数中 TaskUmbilicalProtocol umbilical = (TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class, new InetSocketAddress(port), conf); Task task = umbilical.getTask(taskid); JobConf job = new JobConf(task.getJobFile()); conf.addFinalResource(new File(task.getJobFile())); 可见该子进程也是通过RPC跟TaskTracker进行通讯。 startPinging(umbilical, taskid); // start pinging parent 开一个进程,对TaskTracker进行心跳监测。 String workDir = job.getWorkingDirectory(); if (workDir != null) { FileSystem file_sys = FileSystem.get(job); file_sys.setWorkingDirectory(new File(workDir)); } task.run(job, umbilical); // run the task 这里才真正开始执行Task。 分析到此告一段落,下面开始构造一个分布式执行的环境。 |
|