分享

并行编程及MapReduce简介

 木有银 2011-05-12
Audience and Pre-Requisites 

1.读者和预备知识

Serial vs. Parallel Programming 

2.串行编程与并行编程

The Basics 

3.基础知识

What is MapReduce? 

4.MapReduce

MapReduce Execution Overview 

5.MapReduce执行概要

MapReduce Examples 

6.MapReduce例子

References 

7.参考资料

--------------------------------------------------------------------------------

 

Audience and Pre-Requisites

1.读者和预备知识

This tutorial covers the basics of parallel programming and the MapReduce programming model. The pre-requisites are significant programming experience with a language such as C++ or Java, and data structures & algorithms. 

本教程包含并行编程和MapReduce编程模型。预备的知识包括一定的编程经验(C++,Java etc)、数据结构和算法知识。

Serial vs. Parallel Programming 

2.串行编程与并行编程

In the early days of computing, programs were serial, that is, a program consisted of a sequence of instructions, where each instruction executed one after the other. It ran from start to finish on a single processor. 

在早期的计算机,程序都是串行的,那意味着,一个程序由一连串顺序执行的指令组成。它可以在一个处理器里从头到尾完成全部工作。

Parallel programming developed as a means of improving performance and efficiency. In a parallel program, the processing is broken up into parts, each of which can be executed concurrently. The instructions from each part run simultaneously on different CPUs. These CPUs can exist on a single machine, or they can be CPUs in a set of computers connected via a network. 

为了提高程序的性能和效率,并行编程应运而生。在并行式程序里,处理过程被分为多个部分,每个部分可以并发执行。其中每个部分里面的指令可以同时在不同的CPU上运行。这些CPU可以在同一台机器上,也可以在网络中的一组机器上。

Not only are parallel programs faster, they can also be used to solve problems on large datasets using non-local resources. When you have a set of computers connected on a network, you have a vast pool of CPUs, and you often have the ability to read and write very large files (assuming a distributed file system is also in place). 

并行式程序不仅运行更快,而且能被用于解决分布式大数据集的问题。当你有一组网络中的机器、大量的CPU、一个分布式文件系统和读写海量文件的权限,这时候,并行式程序将显得非常实用。

The Basics 

3.基础知识

The first step in building a parallel program is identifying sets of tasks that can run concurrently and/or paritions of data that can be processed concurrently. Sometimes it's just not possible. Consider a Fibonacci function: 

建立并行式程序的第一步是确定能并行执行的任务,或者能并行处理的数据。不过,有时候这并不可能。考虑一个Fibonacci(斐波那契)函数:

Fk+2 = Fk + Fk+1

Fk+2 = Fk + Fk+1

A function to compute this based on the form above, cannot be "parallelized" because each computed value is dependent on previously computed values. 

计算上述函数的程序并不能被并行化。因为每次计算都基于前一次计算的值。

A common situation is having a large amount of consistent data which must be processed. If the data can be decomposed into equal-size partitions, we can devise a parallel solution. Consider a huge array which can be broken up into sub-arrays.  

而一般的情况是,我们需要处理一批大量的数据,这些数据可以被分成同等大小的若干份,例如一个大的数组被分成子数组。如果每个数组的元素都需要被处理,而且数组间没有依赖关系,执行的计算任务之间也不需要通信,这样的话将是一个执行并行式计算的理想环境。下面介绍一种一般的实现并行计算的技术----master/worker(master和worker分别代表计算环境中主/从的角色----译者注)。

If the same processing is required for each array element, with no dependencies in the computations, and no communication required between tasks, we have an ideal parallel computing opportunity. Here is a common implementation technique called master/worker. 

 

The MASTER: 

Master:

initializes the array and splits it up according to the number of available WORKERS 

初始化需要处理的数组,然后根据可用的worker数量,把数组分解成若干部分(子数组)。

sends each WORKER its subarray 

把子数组发送给worker

receives the results from each WORKER 

接收worker返回的处理结果

The WORKER: 

worker:

receives the subarray from the MASTER 

接收master发送过来的子数组

performs processing on the subarray 

对子数组进行处理

returns results to MASTER 

向master返回结果

This model implements static load balancing which is commonly used if all tasks are performing the same amount of work on identical machines. In general, load balancing refers to techniques which try to spread tasks among the processors in a parallel system to avoid some processors being idle while others have tasks queueing up for execution. 

一般来说,在并行式系统里,负载平衡是指在处理器之间平衡分配任务,避免出现某些处理器有等待执行的任务而某些处理器则空闲的情况。而上面提到的这个模型应用的是静态的负载平衡,它常用于每台机器负责同等工作量的情况。

A static load balancer allocates processes to processors at run time while taking no account of current network load. Dynamic algorithms are more flexible, though more computationally expensive, and give some consideration to the network load before allocating the new process to a processor. 

静态负载平衡在分配任务的时候并不考虑现时的网络负载情况。而动态负载算法虽然花费更高,但它更加灵活。在分配任务之前它会对网络负载进行考虑。

As an example of the MASTER/WORKER technique, consider one of the methods for approximating pi. The first step is to inscribe a circle inside a square: 

下面考虑一个MASTER/WORKER的具体例子:估算PI。首先,设有一个圆在正方形内。如图1.

                              图1

 

The area of the square, denoted As = (2r)2 or 4r2. The area of the circle, denoted Ac, is pi * r2. So: 

所以,正方形的面积As = (2r)2 or 4r2  

圆的面积Ac = pi*r2 

pi = Ac / r2

所以有,

pi = Ac / r2

As = 4r2

r2 = As / 4

pi = 4 * Ac / As

As = 4r2

 

r2 = As / 4

 

pi = 4 * Ac / As

 

The reason we are doing all these algebraic manipulation is we can parallelize this method in the following way. 

通过上面代数运算的分析,我们可以试着用并行式的方法解决这个问题。

首先,问题的解决可以分解为以下四步。

Randomly generate points in the square 

1.在正方形内随机产生一些点

Count the number of generated points that are both in the circle and in the square 

2.计算同时在圆内和正方形内的点

r = the number of points in the circle divided by the number of points in the square 

3.计算r,r = 圆内的点数目/所有正方形内的点数目 

PI = 4 * r 

4. PI= 4*r

And here is how we parallelize it: 

然后,我们试着对这个过程进行并行化处理。

NUMPOINTS = 100000; // some large number - the bigger, the closer the approximation

NUMPOINTS = 100000; // 随机选取的点的数量,越大则估算越准确

p = number of WORKERS;

p = WORKERS的数目

numPerWorker = NUMPOINTS / p;

每个worker处理的点数目 numPerWorker = NUMPOINTS / p

countCircle = 0;   // one of these for each WORKER

countCircle = 0; // 圆内的点的计数器,每个worker维护一个,初始化为0

// each WORKER does the following:

//r每个worker执行如下的工作。

for (i = 0; i < numPerWorker; i++) {  

generate 2 random numbers that lie inside the square;  

xcoord = first random number;   

ycoord = second random number;

  if (xcoord, ycoord) 

lies inside the circle  countCircle++;}//

//master执行如下的工作:

receives from WORKERS their countCircle values  computes PI from these values: PI = 4.0 * countCircle / NUMPOINTS;(应该对每个worker返回的countCircle进行相加------译者注)

4.MapReduce

Now that we have seen some basic examples of parallel programming, we can look at the MapReduce programming model. This model derives from the map and reduce combinators from a functional language like Lisp. 

上面,我们给出了一些并行式编程的列子,现在我们看看MapReduce的编程模型。这个模型的灵感来自于函数式编程语言Lisp中的map(映射)和reduce(化简)。

In Lisp, a map takes as input a function and a sequence of values. It then applies the function to each value in the sequence. A reduce combines all the elements of a sequence using a binary operation. For example, it can use "+" to add up all the elements in the sequence. 

在Lisp里面,map接收一个函数和一个序列作为输入,然后把这个输入的函数应用于这个序列里的每个元素。而reduce则通过一个二进制操作把一个序列的元素联合起来,例如它可以使用加法对序列里的元素求和。

MapReduce is inspired by these concepts. It was developed within Google as a mechanism for processing large amounts of raw data, for example, crawled documents or web request logs. This data is so large, it must be distributed across thousands of machines in order to be processed in a reasonable time. This distribution implies parallel computing since the same computations are performed on each CPU, but with a different dataset. MapReduce is an abstraction that allows Google engineers to perform simple computations while hiding the details of parallelization, data distribution, load balancing and fault tolerance. 

MapReduce是受到这些概念的启发而产生的一个模型。Google对这个模型进行了实现,用来处理巨量的数据,例如网络爬虫得到的文档和web访问到的记录。由于数据量大,它必须被分布到数千台机器进行处理。因为计算分布到不同的CPU,且每个CPU处理不同的数据集,所以这样的分布式处理意味着可以采用并行计算。通过MapReduce的抽象,google工程师可以简单地进行运算,而不必考虑并行运算的细节、数据的分配、负载平衡和错误处理。

Map, written by a user of the MapReduce library, takes an input pair and produces a set of intermediate key/value pairs. The MapReduce library groups together all intermediate values associated with the same intermediate key I and passes them to the reduce function. 

MapReduce 库的用户指定一个Map函数,通过这个函数接收key/value对,然后产生一系列的中间key/value对。MapReduce库把所有具有相同key的中间对组合起来,传递到reduce函数。

The reduce function, also written by the user, accepts an intermediate key I and a set of values for that key. It merges together these values to form a possibly smaller set of values. [1] 

Reduce函数同样由用户指定。它接收一个中间key和对应该key的一个数据集,然后把这个大的数据集组合成一个更小数据集。

Consider the problem of counting the number of occurrences of each word in a large collection of documents: 

下面考虑一个统计词频的问题。

map(String key, String value): 

// key: document name 

// value: document contents for each word w in value:  

 EmitIntermediate(w, "1"); 

reduce(String key, Iterator values):

// key: a word

// values: a list of countsint result = 0;

for each v in values:  result += ParseInt(v);

Emit(AsString(result));     [1]

在上面的例子中,map函数对每个单词记一次计数(在这个例子中为“1”)。reduce函数把同一个单词的计数全部加起来。

MapReduce Execution Overview 

5.MapReduce执行概要

The Map invocations are distributed across multiple machines by automatically partitioning the input data into a set of M splits or shards. The input shards can be processed in parallel on different machines. 

对于需要处理的数据,首先M块,然后把数据块分配到多台机器上。每个机器对数据块进行Map函数处理。这样,输入的数据块就能同时在不同的机器上进行并行处理。

Reduce invocations are distributed by partitioning the intermediate key space into R pieces using a partitioning function (e.g., hash(key) mod R). The number of partitions (R) and the partitioning function are specifed by the user. 

接着,对map操作产生的中间key进行分块,分成r块。分块函数和分块数目可由用户指定,例如可以采用函数hash(key)modR进行分块。

The illustration below shows the overall fow of a MapReduce operation. When the user program calls the MapReduce function, the following sequence of actions occurs (the numbered labels in the illustration correspond to the numbers in the list below). 

下面列出了一个MapReduce操作的整体流程。

 

The MapReduce library in the user program first shards the input files into M pieces of typically 16 megabytes to 64 megabytes (MB) per piece. It then starts up many copies of the program on a cluster of machines. 

1.MapReduce库把输入的文件(数据)分成M块(一般每块16-64M),然后机器集群中运行多个mapreduce程序的副本。

One of the copies of the program is special: the master. The rest are workers that are assigned work by the master. There are M map tasks and R reduce tasks to assign. The master picks idle workers and assigns each one a map task or a reduce task. 

2.其中有个特别的副本:master。其余的都是worker。worker执行master分配的任务。总共有m个map任务和r个reduce任务需要分配。master选择空闲的worker分配这些任务。

A worker who is assigned a map task reads the contents of the corresponding input shard. It parses key/value pairs out of the input data and passes each pair to the user-defined Map function. The intermediate key/value pairs produced by the Map function are buffered in memory. 

3.被分配到map任务的worker读取对应的数据块。然后通过数据块分析出key/value对,然后把它们传递到用户定义的map函数进行处理,产生中间的key/value对,在内存进行缓存。

Periodically, the buffered pairs are written to local disk, partitioned into R regions by the partitioning function. The locations of these buffered pairs on the local disk are passed back to the master, who is responsible for forwarding these locations to the reduce workers.

4.每隔一段时间,被缓存的数据对通过分区函数被映射到不同的R个区域,然后写入到本地磁盘。然后这些数据的位置被传递到master,master把这些位置传到负责reduce任务的worker

When a reduce worker is notified by the master about these locations, it uses remote procedure calls to read the buffered data from the local disks of the map workers. When a reduce worker has read all intermediate data, it sorts it by the intermediate keys so that all occurrences of the same key are grouped together. If the amount of intermediate data is too large to fit in memory, an external sort is used.

5.负责reduce任务的worker接收到这些位置信息以后,使用RPC读取这些数据。当所有中间数据读取完毕后。通过中间key对数据进行分组,key相同的数据对被分到同一组。

The reduce worker iterates over the sorted intermediate data and for each unique intermediate key encountered, it passes the key and the corresponding set of intermediate values to the user's Reduce function. The output of the Reduce function is appended to a final output file for this reduce partition. 

6.reduce worker把整理后的数据传递到reduce函数进行处理。对于根据reduce的分区,reduce函数的输出结果被写入到不同的输出文件。

When all map tasks and reduce tasks have been completed, the master wakes up the user program. At this point, the MapReduce call in the user program returns back to the user code. 

7.当所有map和reduce操作完成后,master唤醒用户程序。

After successful completion, the output of the MapReduce execution is available in the R output files. [1] 

8.完成后,MapReduce的执行结果被保存在R个输出文件。

To detect failure, the master pings every worker periodically. If no response is received from a worker in a certain amount of time, the master marks the worker as failed. Any map tasks completed by the worker are reset back to their initial idle state, and therefore become eligible for scheduling on other workers. Similarly, any map task or reduce task in progress on a failed worker is also reset to idle and becomes eligible for rescheduling. 

为了检测可能的故障,master周期性地ping各个worker。如果某个worker响应超时,master把worker标识为故障。这个worker处理的任何map操作结果需要回滚,回滚后的数据可由其他正常的worker进行处理。类似的,任何在故障机器上的map或reduce任务会被标识为空闲(未分配),master重新对这些任务进行分配。

Completed map tasks are re-executed when failure occurs because their output is stored on the local disk(s) of the failed machine and is therefore inaccessible. Completed reduce tasks do not need to be re-executed since their output is stored in a global fille system. 

因为map任务把处理后的数据存储在本地磁盘上,所以故障机器上的map任务需要重新执行。而reduce任务吧输出数据存储到全局文件系统,所以即时发生故障也不需重新执行。

MapReduce Examples 

6.MapReduce例子

Here are a few simple examples of interesting programs that can be easily expressed as MapReduce computations. 

下面是一些mapreduce应用中有趣的例子。

Distributed Grep: The map function emits a line if it matches a given pattern. The reduce function is an identity function that just copies the supplied intermediate data to the output. 

  分布式grep(正则表达式匹配):map函数对符合样式(正则表达式规则)的行进行标识。reduce函数是一个恒等函数,它只负责把中间数据发送到输出文件。

Count of URL Access Frequency: The map function processes logs of web page requests and outputs . The reduce function adds together all values for the same URL and emits a pair. 

  URL访问计数:map函数处理web页面的请求日志。reduce函数根据URL进行累计。

Reverse Web-Link Graph: The map function outputs pairs for each link to a target URL found in a page named "source". The reduce function concatenates the list of all source URLs associated with a given target URL and emits the pair: 

  web连接图反转:map函数输出数据对<目标URL,可能的连接>。reduce函数输出一个数据对<目标URL,所有可能的连接列表>。

Term-Vector per Host: A term vector summarizes the most important words that occur in a document or a set of documents as a list of pairs. The map function emits a pair for each input document (where the hostname is extracted from the URL of the document). The reduce function is passed all per-document term vectors for a given host. It adds these term vectors together, throwing away infrequent terms, and then emits a final pair.

主机的词条矢量:词条矢量通过<词条,频率>数据对总结了文档或文档集里面重要的单词。map函数对输入文档进行<词条,频率>分析。reduce函数把所有中间结果发送到一台特定的主机。主机把根据词条进行累加,丢弃频率低的词条,最后得出一个合适的词条进行主机描述。

Inverted Index: The map function parses each document, and emits a sequence of pairs. The reduce function accepts all pairs for a given word, sorts the corresponding document IDs and emits a pair. The set of all output pairs forms a simple inverted index. It is easy to augment this computation to keep track of word positions. [1] 

反向索引:map函数对每个文档进行索引分析,产生一串数据对。reduce函数接收含有特定的单词数据对,然后对相应的文档ID进行排序,得出一个<索引,文档ID列表>对。所有这些输出的数据对组成了一个简单的反向索引。可以通过这种方式对单词的位置保持跟踪。

References 

7.参考资料

[1] Dean, Jeff and Ghemawat, Sanjay. MapReduce: Simplified Data Processing on Large Clusters http://labs.google.com/papers/mapreduce-osdi04.pdf 

[1] Dean, Jeff and Ghemawat, Sanjay. MapReduce: Simplified Data Processing on Large Clusters http://labs.google.com/papers/mapreduce-osdi04.pdf

[2] Lammal, Ralf. Google's MapReduce Programming Model Revisited. http://www.cs./~ralf/MapReduce/paper.pdf 

[2] Lammal, Ralf. Google's MapReduce Programming Model Revisited. http://www.cs./~ralf/MapReduce/paper.pdf

[3] Open Source MapReduce: http://lucene./hadoop/

[3] Open Source MapReduce: http://lucene./hadoop/

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多