分享

Hadoop | In Programming We Trust

 gerial 2011-09-29

Shuffle and Sort in Hadoop – Reduce Side

map阶段的output就在执行map task的那台tasktracker的本地硬盘上,而tasktracker现在需要执行reduce task。而且,reduce task的输入可能是有若干个map task中的某些输出(对应于该reduce task的partition)。 map tasks在不同的时间结束,reduce task在map tasks结束的时候就开始拷贝map的输出。这就是reduce task的copy phase。reduce task有一定数目的拷贝线程来进行并行拷贝,默认是5线程,可以通过修改mapred.reduce.parallel.copies属性来配置。 如果map output比较小,那么会被直接拷入tasktracker的内存(由 mapred.job.shuffle.input.buffer.percent控制,表示heap中用来存放map output的比例);否则,map output会被拷贝到disk上。当内存中的buffer达到了threshold(由 mapred.job.shuffle.merge.percent控制),或者达到了一定量(由 mapred.inmem.merge.threshold),它会被merge并且写入磁盘。 随着copy在磁盘上积累,一个后台线程会把他们merge成更大的有序文件。这样做节省了以后做merge的时间。注意,如果map output被压缩了,那么就不得不先解压然后再merge。 当所有的map outputs都被拷贝了以后,reduce task进入sort phase(称为merge phase更合适,因为sort主要是由map side完成的),这个phase将map outputs做合并,同时维护他们的排序。这轮phase分为若干个rounds执行。例如,如果有50个map outputs,而merge的因子是10(由io.sort.factor控制,和map的merge一样),那么就需要5轮。每一轮将10个文件 merge为一个,所以到最后会有5个intermediate files。 在这里,不会有最终一轮将5个file合并为一个单独的有序文件。merge phase节省了一次磁盘交互,直接将这些文件交给reduce function去完成reduce phase。最终的merge可能是in-memory和on-disk的混合。

Posted in Hadoop | Tagged | Leave a comment

Shuffle and Sort in Hadoop – The Map Side

MapReduce保证每个reducer的输入都是按key排序的。系统执行排序,并且把map的output传递给reducers作 为input的过程,被称作shuffle。 The Map Side 当map function开始输出的时候,它并不是简单的直接往硬盘上写。过程有些复杂,采用了把写请求缓存在memeory的方案,并且为了照顾效率,做了一些 presorting的操作。 每个map task都有一个circular memory buffer,用来缓存output。buffer默认设置是100MB,可以通过io.sort.mb属性来配置。当buffer的内容达到一个固定的 threshold时(io.sort.spill.percent,默认0.80),一个后台的thread开始把buffer固化到硬盘上。此时 Map的output仍然写入buffer,但是一旦buffer写满了,map就会被阻塞,直到buffer完全被写入硬盘。 Spill是以round-robin方式向指定的路径中写的,路径由mapred.local.dir属性指定。 在写入disk之前,thread首先会对data进行partition,partition的根据是data最终发往的reducer。在每个 partition中,后台的thread按照key进行in-memory sort,如果user实现了combiner函数,还会对sort的output执行combiner。 每当memory buffer达到了spill threshold,就会创建一个新的spill file。所以,当map task把所有的output都写完了之后,可能会存在一些spill files。在task完成了之后,这些spill files被merge到一个单独的partitioned并且sorted文件中。io.sort.factor控制是几路归并,默认值为10. 如果user实现了一个combiner函数,而且spills的数目大于等于3(由min.num.spills.for.combine属性指定), 那么在写output文件之前,会执行combiner。combiner的意义在于能够更大程度的合并map的output,所以写入local disk的data就会较少,也会降低网络传输的压力。 对map的output进行压缩也是个很好的主意,这样做的好处有:加快了写入磁盘的速度,节省了磁盘空间,减少了网络传输的流量。默认配置下 output是不压缩的,可以通过把mapred.compress.map.output设置为true来实现压缩。压缩的lib由 mapred.map.output.compression.codec指定。 output文件的partition通过HTTP协议发送给reducer,这个过程中有很多worker threads来负责传输工作。worker thread的数量由tasktracker.http.threads属性来控制,这个设置是针对tasktracker的,而不是针对每个map task slot,默认值为40.

Posted in Hadoop | Tagged | Leave a comment

How MapReduce Works in Hadoop — Progress and Status Progress

MapReduce jobs一般都是运行时间比较长的批处理任务。所以,在运行时间里向用户反馈一些job的运行进程是很重要的。一个Job和它的每个task都有自己的 status,里面包括了job/task的状态(正在执行,成功,或者失败),maps和reduces的进度,job counters的值,以及状态信息或者描述(可以由用户代码设定)。这些状态在job的执行过程中不断变化,所以我们需要了解它们是如何和用户代码通信 的。 在一个task运行的时候,它不断的跟踪自己的进度(progress),即task完成的比例。对于map tasks,进度就是已经处理过的input的比例。对于reduce tasks就稍微复杂一点,但是系统仍然可以估计reduce input处理的比例。把reduce的整个过程分为3部分:copy,sort,reduce。如果一个reduce task处理了一半的input,那么这个task就完成了5/6(copy和sort各占1/3,再加上reduce这份的一半,即1/3 + 1/3 + 1/6 = 5/6). Tasks还有一些counters,记录了task在运行过程中的各种状态。这些counters有的是MapReduce系统自带的,也有一些的用户 自定义的。 Progress并不都是可测量的,但是不管怎样它能够告诉Hadoop某个task正在干正经事。Progress是很重要的,因为Hadoop不会主 动撤销一个正在report progress的task,以下这些操作都有报告progress的行为: 读一条input record(mapper或reducer) 写一条output record(mapper或reducer) 设置status description(使用Reporter的setStatus()方法) 增大一个counter的值(使用Reporter的incrCounter()方法) 调用Reporter的progress()方法 当一个task报告progress时,它会设置一个flag,以表示status的变化需要发送给tasktracker。一个单独的thread每隔 3秒会检查一下这个flag,如果发现flag被set了,就通知tasktracker当前的task status。同时tasktracker每隔5秒向jobtracker发送一次心跳包(5秒是最短的internal,这是由cluster的规模决 定的,cluster越大,internal越长),心跳包中包含了job的所有task的status。Counters的发送间隔比5秒要长,因为 Counter占用的带宽相对较高。 JobTracker汇总这些更新的status后,生成一个所有job以及task的status的总览。JobClient每秒都向 JobTracker查询最新的status。用户代码可以通过JobClient的getJob()方法获取一个RunningJob实例,在这里包含 了job的所有status信息。

Posted in Hadoop | Tagged , | Leave a comment

How MapReduce Works in Hadoop — Task Assignment and Execution

Task Assignment tasktracker周期性的向jobtrakcer发送heartbeat。Heartbeats表示了tasktracker还活着,除此之外,还 包含了tasktracker是否已经准备好了执行新的task。如果已经准备好了,那么jobtracker会通过heartbeat的返回值分配给 tasktracker一个task。 Jobtracker要给tasktracker分配task,首先要选择一个job。默认的调度策略是从一个简单的job优先列表中选择job,确定了 job后,再给tasktracker分配task。 Tasktracker能够同时执行多少个map task或者reduce task是固定的。默认的调度策略优先分配map task。也就是说,如果map task的slots还有空余,那么一定会先分配map task。 在选择task的时候,首先要明确一个data locality的概念:data locality是指data在哪里存储和data在哪里计算的关系。reduce task和data locality无关,所以只需要从没有执行的task list中选一个即可。而对于map task,则需要考虑tasktracker的网络位置,并且尽量挑选一个input split离tasktracker较近的task。最优情况是data-local,即input split和tasktracker在同一台机器上。也有可能是rack-local,即input split和tasktracker在同一个机柜上。有些任务既不是data-local,也不是rack-local,它们需要从另外的机柜中读取 input split。 Task Execution 当tasktracker接受到了task,下一步就是去执行它了。 首先,要从DFS上把JAR包拷贝到本地,同时把distributed cache传过来的所有文件都拷贝下来。 然后,创建一个本地工作路径,将jar包解压到该目录 最后,创建一个TaskRunner去执行该task TaskRunner启动一个新的JVM来执行每个task,所以在用户实现的mapper和reducer中出现的bug或者异常,不会影响到 tasktracker。 子进程通过umbilical接口和父进程通信,通过这种方式每隔一段时间(a few seconds)向父进程通报task的progress,直到task完成。

Posted in Hadoop | Tagged | Leave a comment

How MapReduce Works in Hadoop – Job Submission & Initialization

要运行一个Hadoop的MapReduce任务,只需要很简单的一行代码即可: JobClient.runJob(conf); 看起来很短,但实际上它在幕后执行了很多操作。本文就是来揭密Hadoop运行一个job需要哪些步骤。下图为Hadoop MapReduce Job的执行过程。 可以看到在最上层,有四个组件: Client:发起MapReduce Job的程序 JobTracker:管理Job的一个Java应用程序 TaskTracker:真正运行Task的Java应用程序(一个Job被拆分成了很多Task) DFS:一般为HDFS。用来在组件之间共享文件 ??? Job Submission JobClient的runJob()方法创建一个JobClient的实例,并且调用该实例的submitJob()方法。在提交完Job 后,runJob()每隔一秒都会获取一次Job的progress,并且在progress发生变化的时候把它打印到console中。当Job完成 时,如果Job成功了,那么显示Job counters;否则,导致Job失败的错误会被打印到console中。 JobClient的submitJob()方法做了如下几件事: 向JobTracker获取一个新的job ID(通过调用getNewJobId()方法) 检查Job指定的输出路径。如果输出路径还没有被指定,或者该路径已经存在了,那么这个job就不会被提交,同时MapReduce程序也会抛出异常。 为Job计算出它的splits。如果splits没有计算成功,比如,没有制定input path,job也不会被提交,同时MapReduce程序会抛出异常 把运行Job所需要的资源(jar包,配置文件,input splits等)拷贝到jobtracker所在的分布式文件系统中去。存储的路径以job ID为开头。job JAR文件会被复制多份。(由mapred.submit.replication控制,默认值为10)。所以当 tasktracker需要执行task的时候,可以从很多地方读取该Jar包。 告诉JobTracker,该Job已经准备妥当,可以执行了。 Job Initialization 当JobTracker接收到一个对它的submitJob()方法的调用命令后,它将该job放入一个内部的队列,job scheduler会从这个 队列中拣出Job并进行初始化。初始化工作包括了创建一个对象来表示将要被执行的Job,这个对象封装了它的tasks,并且能够记录tasks的状态以 及progress信息。 要创建一组tasks,job scheduler首先从DFS中取出之前有JobClient计算好的input splits。然后为每一个split创建一个map task。Reduce task的数量由JobConf的属性来决定(mapred.reduce.tasks)。在这一步,Task会被给定一个ID。

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多