Apache Hadoop是一个开源软件框架,可安装在一个商用机器集群中,使机器可彼此通信并协同工作,以高度分布式的方式共同存储和处理大量数据。最初,Hadoop 包含以下两个主要组件: Hadoop Distributed File System (HDFS) 和一个分布式计算引擎,该引擎支持以 MapReduce 作业的形式实现和运行程序。 MapReduce 是 Google 推广的一个简单的编程模型,它对以高度并行和可扩展的方式处理大数据集很有用。MapReduce 的灵感来源于函数式编程,用户可将他们的计算表达为 Map和 Reduce 函数,将数据作为键值对来处理。Hadoop 提供了一个高级 API 来在各种语言中实现自定义的 Map和 Reduce 函数。 Hadoop 还提供了软件基础架构,以一系列Map和 Reduce 任务的形式运行 MapReduce 作业。Map任务 在输入数据的子集上调用 Map函数。在完成这些调用后,Reduce任务 开始在 map 函数所生成的中间数据上调用 Reduce 任务,生成最终的输出。Map和Reduce 任务彼此单独运行,这支持并行和容错的计算。 最重要的是,Hadoop 基础架构负责处理分布式处理的所有复杂方面:并行化、调度、资源管理、机器间通信、软件和硬件故障处理,等等。得益于这种干净的抽象,实现处理数百(或者甚至数千)个机器上的数 TB 数据的分布式应用程序从未像现在这么容易过,甚至对于之前没有使用分布式系统的经验的开发人员也是如此。 MR 架构 Map Reduce 过程图 将任务分割为Map端和Reduce 端,下图是JobClient、JobTracker和TaskTracker架构。
Shuffle Combine 整体的 Shuffle 过程包含以下几个部分: Map端Shuffle、Sort阶段、Reduce端 Shuffle。即是说: Shuffle过程横跨 Map 和 Reduce两端,中间包含 Sort 阶段,就是数据从 map task 输出到 Reduce task 输入的这段过程。Sort、Combine是在 Map端的,Combine是提前的Reduce,需要自己设置。 Hadoop集群中,大部分Map Task 与 Reduce Task 的执行是在不同的节点上。当然很多情况下 Reduce 执行时需要跨节点去拉取其它节点上的 Map Task 结果。如果集群正在运行的Job 有很多,那么Task 的正常执行对集群内部的网络资源消耗会很严重。而对于必要的网络资源消耗,最终的目的就是最大化地减少不必要的消耗。还有在节点内,相比于内存,磁盘IO对Job完成时间的影响也是可观的。从最基本的要求来说,对于MapReduce的Job性能调优的Shuffle 过程,目标期望可以有: 完整地从Map Task 端拉取数据到Reduce 端。在跨节点拉取数据时,尽可能地减少对带宽的不必要消耗。减少磁盘IO对Task 执行的影响。 总体来讲这段Shuffle 过程,能优化的地方主要在于减少拉取数据的量及尽量使用内存而不是磁盘。 Map Shuffle 1、输入 在Map Task执行时,其输入来源HDFS 的Block,Map Task只读取Split。Split 与Block 的对应关系可能是多对一,默认为一对一。 2、切分 然后将数据写入内存缓冲区中,缓冲区的作用是批量收集Map 结果,减少磁盘 IO 的影响。Key/Value 对以及Partition 的结果都会被写入缓冲区。写入之前,Key与Value 值都会被序列化成字节数组。 3、溢写 这个溢写是由另外单独线程来完成,不影响往缓冲区写Map结果的线程。整个缓冲区有个溢写的比例spill.percent。这个比例默认是0.8, Combiner 将有相同Key的Key/Value对加起来,减少溢写 spill 到磁盘的数据量。Combiner 的适用场景: 由于Combiner的输出是Reducer的输入,Combiner 绝不能改变最终的计算结果。故大多数情况下,Combiner 适用于输入输出的 key/value 类型完全一致,且不影响最终结果的场景(比如累加、最大值等)。 Merge Map很大时,每次溢写会产生一个spill_file,这样会有多个spill_file,而最终的输出只有一个文件,在最终输出之前会对多个中间过程多次产生的溢写文件 spill_file 进行合并,此过程就是Merge。 Merge 就是把相同Key 的结果加起来(当然,如果设置过Combiner,也会使用 combiner 来合并相同的Key)。 Reduce Shuffle 在Reduce Task 之前,不断拉取当前Job 里每个Maptask的最终结果,然后对从不同地方拉取过来的数据不断地做Merge ,也最终形成一个文件作为Reduce Task 的输入文件。 1、Copy Reduce进程启动一些数据copy 线程 (Fetcher),通过HTTP方式请求map task 所在的 TaskTracker 获取map task 的输出文件。因为 maptask 早已结束,这些文件就归 TaskTracker 管理在本地磁盘中。 2、Merge Copy 过来的数据会先放入内存缓冲区中,这里的缓冲区大小要比 map 端的更为灵活,它基于JVM的 heap size设置,因为Shuffle 阶段 Reducer不运行,所以应该把绝大部分的内存都给 Shuffle 用。这里需要强调的是,Merge有三种形式:
默认情况下第一种形式不启用,让人比较困惑,是吧。当内存中的数据量到达一定阈值,就启动内存到磁盘的Merge 。与Map端类似,这也是溢写的过程,这个过程中如果你设置有Combiner,也是会启用的,然后在磁盘中生成了众多的溢写文件。第二种Merge 方式一直在运行,直到没有Map 端的数据时才结束,然后启动第三种磁盘到磁盘的Merge 方式生成最终的那个文件。 3、Reducer的输入 Merge 的最后会生成一个文件,大多数情况下存在于磁盘中,但是需要将其放入内存中。当Reducer输入文件已定,整个Shuffle 阶段才算结束。然后就是 Reducer 执行,把结果放到 HDFS 上。 YARN YARN(Yet Another Resource Negotiator)是下一代 MapReduce框架的名称,为了容易记忆,一般称为MR v2。该框架已经不再是一个传统的MapReduce 框架,甚至与 MapReduce 无关,她是一个通用的运行时框架,用户可以编写自己的计算框架,在该运行环境中运行。用于自己编写的框架作为客户端的一个lib,在运用提交作业时打包即可。 MR的缺点(YARN取代MR) 经典 MapReduce 的最严重的限制主要关系到可伸缩性、资源利用和对与 MapReduce 不同的工作负载的支持。在 MapReduce 框架中,作业执行受两种类型的进程控制: 一个称为JobTracker 的主要进程,它协调在集群上运行的所有作业,分配要在 TaskTracker上运行的Map和Reduce 任务。 许多称为TaskTracker 的下级进程,它们运行分配的任务并定期向 JobTracker报告进度。 大型的Hadoop 集群显现出了由单个JobTracker 导致的可伸缩性瓶颈。
MapReduce框架的不足 JobTracker是集群事务的集中处理点,存在单点故障。JobTracker 需要完成的任务太多,既要维护 job 的状态又要维护 job 的 task 的状态,造成过多的资源消耗。 在TaskTracker 端,用Map/Reduce Task 作为资源的表示过于简单,没有考虑到 CPU、内存等资源情况,当把两个需要消耗大内存的 task 调度到一起,很容易出现OOM。 把资源强制划分为Map/Reduce Slot, 当只有Map Task 时,Reduce Slot不能用;当只有Reduce Task 时,Map Slot 不能用,容易造成资源利用不足。 解决可伸缩性问题 在 Hadoop MapReduce中,JobTracker具有两种不同的职责。管理集群中的计算资源,这涉及到维护活动节点列表、可用和占用的Map 和Reduce Slots 列表,以及依据所选的调度策略将可用 slots 分配给合适的作业和任务 协调在集群上运行的所有任务,这涉及到指导TaskTracker 启动Map和 Reduce 任务,监视任务的执行,重新启动失败的任务,推测性地运行缓慢的任务,计算作业计数器值的总和等等。 为单个进程安排大量职责会导致重大的可伸缩性问题,尤其是在较大的集群上,JobTracker 必须不断跟踪数千个 TaskTracker、数百个作业,以及数万个 map 和 reduce 任务。相反,TaskTracker 通常近运行十来个任务,这些任务由勤勉的JobTracker 分配给它们。 为了解决可伸缩性问题,一个简单而又绝妙的想法应运而生: 我们减少了单个 JobTracker 的职责,将部分职责委派给TaskTracker,因为集群中有许多 TaskTracker。在新设计中,这个概念通过将 JobTracker 的双重职责(集群资源管理和任务协调)分开为两种不同类型的进程来反映。 YARN的优点
一个全局 ResourceManager以主要后台进程的形式运行,它通常在专用机器上运行,在各种竞争的应用程序之间仲裁可用的集群资源。
作者: HarperKoo |
|