分布式系统学习——MapReduce方法Paper翻译
读书笔记——MapReduce: Simplified Data Processing on Large Clusters
大型集群上的简化处理:MapReduce(两个主要操作)
原始翻译版本网址:《MapReduce: Simplified Data Processing on Large Cluster 》翻译
Abstract
MapReduce 是一种编程模型和一种用来处理和产生大数据集的相关实现。用户定义 map 函数(map function)来处理 key/value 键值对来产生一系列的中间的 key/value 键值对。还要定义一个 reduce 函数(reduce function)用来合并有着相同中间 key 值的中间 value。许多现实世界中的任务都可以用这种模型来表达,就像下文所展示的那样。
用这个风格(函数式)编写的程序可以自动并行地在大规模集群上工作。运行时系统会自动处理例如切割输入数据,在机器之间调度程序的执行,处理机器故障以及管理必要的机器间通信等细节问题。这可以让那些对于并行分布式系统没有任何经验的程序员也能很简单地利用起一个大的分布式系统的资源。
我们的 MapReduce 的实现运行在一个由大的商业机构成的集群当中并且是高度可扩展的:一个典型的 MapReduce 计算要在上千台机器中处理 TB 数量级的数据。程序员会觉得这个系统非常好用:已经有成千上万的 MapReduce 程序被实现出来并且每天有上千个 MapReduce 任务运行在 Google 的集群上。
Introduction
在过去五年中,作者和许多 Google 的其他人已经实现了成百上千个用于特殊目的的计算程序用于处理大量的 raw data(如抓取文件,Web 请求日志等),用于计算产生各种各样的 derived data(如倒排索引、Web 文件结构的图片展示、每个 host 抓取的文件数量总结、指定日期最频繁的访问请求等)。许多这种计算程序在概念上都是非常直接的。然而输入的数据量往往很大,并且计算需要分布在成百上千台机器中为了在一个可接受的时间内完成任务。但是除了简单的计算模型以外,我们需要大量复杂的代码用来处理例如如何并行化计算、分发数据、处理故障等等问题。
为了解决这样的复杂性,我们设计了一种新的抽象,它让我们只需要表示出我们想要执行的计算模型,而将背后复杂的并行化,容错,数据分发,负载平衡等等技术的实现细节隐藏在了库中。我们这种新的抽象是受 Lisp 以及其他一些函数式编程语言中的 map 和 reduce 原语影响而来的。我们意识到为了计算出一系列的中间键值对,许多的计算都需要对于输入中的每个逻辑“记录”进行 map 操作。然后还需要对所有共享同一个 key 的 value 进行 reduce 操作,从而能够对派生的数据进行适当的组合。我们这种让用户自定义 map 和 reduce 操作的编程模型能够让我们简单地对大量数据实现并行化,并且使用重新执行(re-execution)作为主要的容错机制。
我们这项工作的主要共享是提供了一个简单并且强大的接口能够让我们实现自动的并行化并且分布处理大规模的计算,同时该接口的实现能在大型的商用 PC 集群上获得非常高的性能。
Section 2 描述了基本的编程模型以及一些简单的例子。Section 3 描述了为我们的基于集群的计算环境量身定做的 MapReduce 接口。Section 4 描述了一些我们认为有用的对于编程模型的改进。Section 5 是对我们的实现在不同任务下的性能测试。Section 6 包含了 MapReduce 在 Google 内的使用情况,包括我们以它为基础重写我们的产品索引系统的经验。Section 7 讨论了相关的工作以及未来的发展。
Programming Model
计算模型以一系列的键值对作为输入并产生一系列的键值对作为输出。MapReduce 库的用户以“Map”和"Reduce"两个函数来表达计算。
Map,是由用户编写的,取一个输入对,并且产生一系列中间的键值对。MapReduce 库将那些具有相同的中间键\(I\)的中间值聚集在一起,然后将它们传递给 Reduce 函数。
Reduce,同样是由用户编写的,接收一个中间键\(I\)和该键对应的一系列的中间值。Reduce 函数通过将这些值合并来组成一个可能更小的集合(值的集合)。通常每个 Reduce 函数只产生 0 个或 1 个输出值。Reduce 函数一般通过一个迭代器(via an iterator)来获取中间值,从而在中间值的数目远远大于内存容量时,我们也能够处理。
Example
下面来考虑这样一个问题:统计大量文档中每一个单词出现的次数。对此,用户需要编写类似于如下的伪代码:
1 |
|
Map 函数为在每一个单词出现的时候,为它加上一个计数(在这个简单的例子中就是加 1)。Reduce 函数对每个单词(作为中间键值对的键)的所有计数进行叠加。
另外,用户需要用输入输出文件的名字,以及一个可选的 tuning paramete 去 fill in 一个叫 mapreduce specification 的对象。之后,用户调用 MapReduce 函数,将上述定义的对象传递进去。用户的代码将和 MapReduce 库相连(由 C++实现)。Appendix A 中有这个例子所有的代码文档。
1 |
|
需要注意的是,输入的 key 和 value 与输出的 key 和 value 是不同的类型,而中间的 key 和 value 与输出的 key 和 value 是相同的类型(用 k1 和 k2 表示)。我们的 C++实现都是以字符串的形式和用户代码进行交互的,至于将字符串类型转换成相应合适的类型的工作则由用户代码来完成了。
More Example
接下来是一些能够简单地用 MapReduce 计算模型进行表达的例子
Distributed Grep(分布式查找):Map 函数获取匹配提供的模式的行,Reduce 函数只是简单地将这些中间数据拷贝到输出。
Count of URL Access Frequency(计算 URL 访问频率):Map 函数处理 web 请求的日志,并且输出<URL, 1>。Reduce 函数将拥有相同 URL 的 value 相加,得到<URL, total count>对
Reverse Web-Link Graph:Map 函数输出<target, source>对,其中 source 所在的 page 都有连向 target 这个 URL 的链接。Reduce 函数将给定 target 的所有的 source URL 连接起来,输出<target, list(source)>对
Term-Vector per Host:一个 term vector 表示一系列<word, frequency>的键值对,word 表示一篇或者一系列文章中出现的比较重要的单词,frequency 表示它们出现的次数。Map 函数对于每篇输入的文章输出<hostname, term vector>键值对(其中 hostname 是从文章所在的 URL 中抽取出来的)Reduce 函数获取给定 host 的 term vectors。它将这些 term vectors 累加起来,丢弃非频繁出现的 term,并产生一个最终的<hostname, term vector>对。
Inverted Index:Map 函数对每篇文章进行处理,并输出一系列的<word, document ID>对。Reduce 函数接收给定 word 的所有键值对,对相应的 document ID 进行排序并且输出<word, list
Distributed Sort:Map 函数从每个 record 中抽取出 key,产生<key, record>键值对。Reduce 函数只是简单地将所有对输出。这个计算模型依赖于 Section 4.1 中描述的划分技巧以及 Section 4.2 中描述的排序特性。
(上述可以全部理解了)
Implementation
对于 MapReduce 的接口,各种各样不同的实现都是可能的。所有正确的选择都是基于当下环境的。比如,一种实现可能适合于小的共享内存的机器,另一种可能适合于大型的 NUMA 多处理器机器,甚至有的是为更大的互联的机器集群设计的。
本节中描述的实现基于的是 Google 中最常用的计算环境:一个由大量商用 PC 机通过交换以太网互联的集群。在我们的环境中:
机器通常都是 x86 的双核处理器,其上运行 Linux,每台机器拥有 2-4G 的内存
商用网络硬件---通常是 100 M/s 或者 1 G/s,但是综合起来要小于平均带宽
一个集群由成千上万台机器组成,因此机器故障是常有的事
存储由便宜的 IDE 磁盘提供,它们都与独立的机器直接相连。一个内部研发的文件系统用于管理所有存储于这些硬盘上的文件。该文件系统通过 Replication 在不可靠的硬件上提供了可用性和可靠性
用户提交 jobs 给调度系统。每个 job 由一系列的 task 组成,并且由调度器分配到集群中一系列可用的机器上
Execution Overview
通过将输入数据自动分割成 M 份,Map 函数得以在多台机器上分布式执行。每一个输入块都能并行地在不同的机器上执行。通过划分函数(例如,hash(key) mod R)将中间键划分为 R 份,Reduce 函数也能被分布式地调用。其中划分的数目 R 和划分函数都是由用户指定的。
上图 1 展示了在我们的实现中 MapReduce 全部的流程。当用户程序调用 MapReduce 函数时,接下来的动作将按序发生(图 1 中标记的数字与下面的数字是一一对应的):
用户程序中的 MapReduce 库首先将输入文件划分为\(M\)片,每片大小一般在 16MB 到 64MB 之间(由用户通过一个可选的参数指定)。之后,它在集群的很多台机器上都启动了相同的程序拷贝。
其中有一个拷贝程序是特别的----master。剩下的都是 worker,它们接收 master 分配的任务。其中有 M 个 Map 任务和 R 个 Reduce 任务要分配。master 挑选一个空闲的 worker 并且给它分配一个 map 任务或者 reduce 任务。
被分配到 Map 任务的 worker 会去读取相应的输入块的内容。它从输入文件中解析出键值对并且将每个键值对传送给用户定义的 Map 函数。而由 Map 函数产生的中间键值对缓存在内存中。
被缓存的键值对会阶段性地写回本地磁盘,并且被划分函数分割成 R 份。这些缓存对在磁盘上的位置会被回传给 master,master 再负责将这些位置转发给 Reduce worker。
当 Reduce worker 从 master 那里接收到这些位置信息时,它会使用远程过程调用从 Map worker 的本地磁盘中获取缓存的数据。当 Reduce worker 读入全部的中间数据之后,它会根据中间键对它们进行排序,这样所有具有相同键的键值对就都聚集在一起了。排序是必须的,因为会有许多不同的键被映射到同一个 reduce task 中。如果中间数据的数量太大,以至于不能够装入内存的话,还需要另外的排序。
Reduce worker 遍历已经排完序的中间数据。每当遇到一个新的中间键,它会将 key 和相应的中间值传递给用户定义的 Reduce 函数。Reduce 函数的输出会被添加到这个 Reduce 部分的输出文件中。
当所有的 Map tasks 和 Reduce tasks 都已经完成的时候,master 将唤醒用户程序。到此为止,用户代码中的 MapReduce 调用返回。
当成功执行完之后,MapReduce 的执行结果被存放在 R 个输出文件中(每个 Reduce task 对应一个,文件名由用户指定)。通常用户并不需要将 R 个输出文件归并成一个。因为它们通常将这些文件作为另一个 MapReduce 调用的输入,或者将它们用于另外一个能够以多个文件作为输入的分布式应用。
(个人理解:module R 将中间键值对分为 R 份一方面是为了执行 Reduce work 的处理器进行分布式并行计算,另一方面,产生的分布式数据也可以接着用于其他能以多文件为输入的分布式应用。)
Master Data Structures
在 master 中保存了许多的数据结构。对于每个 Map task 和 Reduce task,master 都保存了它们的状态(idle,in-progress 或者是 completed)以及 worker 所在机器的标识(对于非 idle 空转状态的 tasks 而言)。
master 相当于是一个管道,通过它 Map task 所产生的中间文件被传递给了 Reduce task。因此,对于每一个已经完成的 Map task,master 会存储由它产生的 R 个中间文件的位置和大小(分配给 R 个 Reduce task 执行,需要远程读取这些数据,所以要记录位置和大小)。当 Map task 完成的时候,master 就会收到位置和大小的更新信息。而这些信息接下来就会逐渐被推送到处于 in-progress 状态的 Reduce task 中。
Fault Tolerance
容错处理
因为 MapReduce 库的设计初衷是用成千上万的机器去处理大量的数据,所以它就必须能用优雅的方式对机器故障进行处理。
Worker Failure
master 会周期性地 ping 每一个 worker。如果经过了一个特定的时间还未从某一个 worker 上获得响应,那么 master 会将 worker 标记为 failed。所有由该 worker 完成的 Map task 都被回退为 idle 状态,因此能够被重新调度到其他的 worker 上。同样的,所有 failed worker 正在执行的 Map task 或者 Reduce task 也会被回退为 idle 状态,并且被重新调度。
发生故障的机器上已经完成的 Map task 需要重新执行的原因是,它们的输入是保存在本地磁盘的,因此发生故障之后就不能获取了。而已经完成的 Reduce task 并不需要被重新执行,因为它们的输出是存放在全局的文件系统中的。
当一个 Map task 开始由 worker A 执行,后来又由 worker B 执行(因为 A 故障了)。所有执行 Reduce task 的 worker 都会收到这个重新执行的通知。那些还未从 worker A 中读取数据的 Reduce task 将会从 worker B 中读取数据。
MapReduce 对于大面积的机器故障是非常具有弹性的。例如,在一次 MapReduce 操作中,网络维护造成了集群中八十台机器在几分钟的时间内处于不可达的状态。MapReduce 的 master 只是简单地将不可达的 worker 机器上的工作重新执行了一遍,接着再继续往下执行,最终完成了 MapReduce 的操作。
Master Failure
对于 master,我们可以简单地对上文所述的 master 数据结构做周期性的快照。如果一个 master task 死了,我们可以很快地根据最新的快照来重新启动一个 master task。但是,因为我们只有一个 master,因此故障的概率比较低。所以,在我们的实现中如果 master 出现了故障就只是简单地停止 MapReduce 操作。用户可以检测到这种情况,并且如果他们需要的话可以重新开始一次 MapReduce 操作。
Semantics in the Presence of Failures
如果用户提供的 Map 和 Reduce 操作是关于输入值的确定性函数,那么我们分布式的实现将会产生同样的输出,在整个程序经过没有出现故障的顺序执行之后。
我们依赖 Map task 和 Reduce task 原子性地提交输出来实现上述特性。每一个正在执行的 task 都会将它的输出写到一个私有的临时文件中。一个 Reduce task 产生一个这样的文件,而一个 Map task 产生 R 个这样的文件(每个 Reduce work 一个)。当一个 Map task 完成的时候,worker 就会给 master 发送一个信息,,其中包含了 R 个临时文件的名字。如果 master 收到了一个来自于已经完成了的 Map task 的完成信息,那么它就将它自动忽略。否则,将 R 个文件的名称记录到一个 master 数据结构中。
当一个 Reduce task 完成的时候,Reduce worker 会自动将临时输出文件命名为最终输出文件。如果同一个 Reduce task 在多台机器上运行,那么多个重命名操作产生的最终输出文件名将会产生冲突。对此,我们依赖底层文件系统提供的原子重命名操作来保证最终文件系统中的数据来自一个 Reduce task。
大多数的 Map 和 Reduce 操作都是确定性的,事实上,我们的语义等同于顺序执行。因此这让程序员非常容易地能够解释他们程序的行为。当 Map 和 Reduce 操作是非确定性的时候,我们提供较弱,但仍然合理的语义。在非确定性的操作中,对于一个特定的 Reduce task R1 的输出是对应非确定性程序顺序执行产生的一个结果。然而,对于另一个 Reduce task R2,它的输出对应于非确定性程序另一个顺序执行的结果。
下面考虑 Map task \(M\)和 Reduce task \(R_1\)和\(R_2\)。让\(e(R_i)\)表示\(R_i\)的执行结果。更弱的语义意味着,\(e(R_1)\)可能从 M 的一次执行结果中读取输入,而\(e(R_2)\)可能从 M 的另一次执行中读取输入。
Locality
网络带宽在我们的计算环境中是相对稀缺的资源。我们通过将输入数据(由 GFS 管理)存储在集群中每台机器的本地磁盘的方法来节省带宽。GFS 将输入文件切分成 64MB 大小的块,并且将每个块的多份拷贝(通常为 3 份)存储在不同的机器上。MapReduce 的 master 获取所有输入文件的位置信息,然后将 Map task 调度到有相应输入文件副本的机器上。当发生故障时,再将 Map task 调度到邻近的具有该 task 输入文件副本的机器(即在同一台交换机内具有相同数据的机器)。当在一个集群的大量机器上做 MapReduce 操作时,大多数的输入数据都是从本地读取的,而不用消耗带宽。
Task Granularity
如上所述,我们将 Map 操作分成 M 份,Reduce 操作分成 R 份。在理想的情况下,M 和 R 的值应该要比集群中 worker machine 的数量多得多。让一个 worker 同时进行许多不同的 task 有利于提高动态的负载均衡,同时在一个 worker 故障的时候能尽快恢复。许多已经完成的 Map task 也能尽快地传播到其他所有的 worker machine 上。
在我们的实现中,M 和 R 的大小是有一个实用范围的。因为我们的 master 需要做\(O(M+R)\)个调度决定,并且还要在内存中保存\(O(M*R)\)个状态(源自前面所说:对于每一个已经完成的 Map task,master 会存储由它产生的 R 个中间文件的位置和大小。)。(但是内存使用的常数还是比较小的,\(O(M*R)\)个 Map task/Reduce task 状态对,每个的大小大概在一个字节)
另外,\(R\)通常受限于用户,因为每个 Reduce task 的输出都分散在不同的输出文件中。事实上,我们会选择\(M\),使得每个输入文件大概 16MB 到 64MB 的输入文件(因此上文所述的局部性优化会达到最优,减少带宽负担,尽量利用本地存储数据进行 Map task)。而我们会让 R 成为 worker machine 数量的一个较小的倍数。因此,我们通常在进行 MapReduce 操作时,\(M=200000\),\(R=5000\),使用 2000 个 worker machine。
Backup Tasks
“straggler”(落伍的士兵)的存在是拖慢整个 MapReduce 操作的通常的原因之一。所谓的"straggler"是指一台机器用了过长的时间去完成整个计算任务中最后几个 Map task 或者 Reduce task。Straggler 出现的原因有很多。比如一台机器上硬盘坏了,它就会经历大量的可纠正错误,从而让它的性能从 30MB/s 下降到 1MB/s。集群的调度系统可能将其他 task 调度到该机器上,导致它执行 MapReduce 代码的速度变慢很多,因为 CPU,内存,本地磁盘,网络带宽的竞争加剧。我们最近遇到的一个问题是一台机器的初始化代码有点问题,它会导致处理器的缓存被禁用,在这些受影响的机器上进行的计算速度会下降到原来的百分之一。(ping 得到不判定为故障机,但是自身速度过慢会拖累整体,出现短板效应)
对此,我们有一个通用的机制用来缓解 straggler 的问题。当 MapReduce 操作接近结束的时候,master 会将那些仍在执行的 task 的备份进行调度执行。无论是原来的还是备份执行完成,该 task 都将被标记为已完成。我们通过调整将该操作导致的计算资源消耗仅仅提高了几个百分点(只在即将结束的时候进行备份竞争执行)。但是在完成大型的 MapReduce 操作时,却让整个执行时间下降了好多。例如,Section 5.3 中所描述的排序算法在备份机制关闭的情况下,需要多消耗 44%的时间。
Refinements
虽然对于大多数需求由 Map 和 Reduce 函数提供的功能已经足够了,但是我们还是发现了一些有用的扩展。对它们的描述如下。
Partitioning Function
MapReduce 用户决定他们的 Reduce task 或者输出文件的数目 R。通过一个划分函数,根据中间键值将各个 task 的数据进行划分。默认的划分函数是通过哈希(比如,hash(key) mod R)。这通常会产生非常好的较为均衡的划分。但是在其他一些情况下,通过键值的其他函数来划分要更好一些。例如,有的时候输出键值是一些 URL,我们希望同一个 host 的内容能放在同一个输出文件中。为了支持这种情况,MapReduce 库的用户可以提供一个特殊的划分函数。例如,使用“hash(Hostname(urlKey)) mod R”作为划分函数,从而让所有来自于同一个 host 的 URL 的内容都输出到同一个输出文件。
(个人理解,hash 之前可以根据需求(key 的相似性、urlhost 相同)对 key 提前进行一次分组)
Ordering Guarantees
我们确保在一个给定的划分中,中间键值对都按照键值的升序进行处理。这样的处理顺序确保了每一个划分产生一个排好序的输出文件。这样的话,如果输出文件格式需要支持根据 key 进行有效的随机查找会比较方便。同时,输出文件(应用)的用户也会觉得已经排好序的数据使用起来特别方便。
Combiner Function
在有些情况下,每个 Map task 都会产生大量的中间键的重复而用户指定的 Reduce 函数是交互和关联的。Section 2.1 中的单词统计就是一个很好的例子。因为单词的出现频率服从于 Zipf 分布,每个 Map Task 都会产生成百上千个<the, 1>这样的记录。所有这些记录都会通过网络被送到一个 Reduce task 中,并且由 Reduce 函数加在一起去产生一个数。我们允许用户使用了可选的 Cominer 函数,用于在网络传输之前部分地进行归并操作。
Combiner 函数在每个执行 Map task 的机器上执行。通常 Combiner 和 Reduce 函数使用的是相同的代码。Reduce 函数和 Combiner 函数唯一的不同是 MapReduce 库如何处理函数的输出。Reduce 函数的输出写到最终的输出文件中。而 Combiner 函数的输出会被写到一个最终将被送给 Reduce task 的中间文件中(合并后替代原有的中间键值对集合传递给 Reduce Task 机器,这样减少了带宽的占用)。
部分的合并操作能极大地加速某类特定的 MapReduce 操作。Appendix A 包含了一个使用 Combiner 的例子。
Input and Output Types
MapReduce 库提供了对读入数据文件多种的格式支持。例如,"text"格式的输入将每一行作为键值对:key 是文件内的偏移,value 是该行的内容。另外一种比较常用的格式存储一系列按照键进行排序的键值对。每一个输出格式的实现都知道如何将自己进行合理的划分从而能让不同的 Map task 进行处理(例如,text 模式就知道将区域划分到以行为边界)。用户可以通过简单地定义一个 reader 接口来提供一个新的输入类型的实现。事实上,大多数用户只使用了预定义输入类型的很小一部分。
reader 并不一定要从文件中读取数据。例如,我们可以很容易地定义一个从数据库,或者内存中映射的数据结构中读取记录的 reader。
同理,我们也支持产生不同格式的输出数据,用户也能编写新的输出数据格式。
Side-effects
在有些情况下,MapReduce 的用户会很容易发现 Map 或者 Reduce 操作会产生一些辅助文件作为额外的输出文件。我们依赖应用的编写者去保证这些副作用是原子和幂等的。一般来说,应用会写到一个临时文件中,并且在它完全产生之后,通过一个原子操作将它重命名。
对于一个单一的 task 产生的多个输出文件,我们不提供原子性的两相提交支持。因此,产生多个输出文件并且有跨文件一致性要求的 task 需要是确定性的。但是这样的限制在实践过程中并不是什么问题。
Skipping Bad Records
有时候,如果用户的代码中有 bug 的话,会导致 Map 或者 Reduce 操作在某些记录上崩溃。这些 bug 会导致 MapReduce 操作的正常完成。对于这种情况,通常就是去修 bug。不过有时候这是不可行的,也许 bug 是第三方库造成的,而我们并不能得到它的源代码。而且,有时候我们允许忽略掉一些记录,例如在对一个大数据集做分析的时候。因此我们提供了一种可选的执行模式,当 MapReduce 库检测到一些记录会造成崩溃时,就会主动跳过它们,从而保证正常地运行。
每一个 worker 进程都安装了一个 signal handler 用于捕捉段错误和 bug。在调用用户的 Map 和 Reduce 操作之前,MapReduce 库会将参数的序号保存在一个全局变量中。如果用户代码产生了一个信号,signal handler 就会传输一个参数含有序号的"last gasp"UDP 包给 MapReduce 的 master。当 master 在一个特定的记录中发现了不知一次的错误,这表示在下一次执行相应的 Map 或者 Reduce 操作的时候一个将它跳过。
Local Execution
Map 或者 Reduce 函数的调试问题是非常 tricky 的。因为实际的计算发生在分布式的系统中,通常由成百上千台机器组成,并且工作的分配由 master 动态执行。为了帮助调试,分析,以及小规模的测试,我们开发了另外一个 MapReduce 库的实现,它能够在本地机器上顺序执行一个 MapReduce 操作的所有工作。它的控制交给用户,因此计算可以被限定到制定的 Map task 中执行。用户利用指定的 flag 启动程序,然后就能非常简单地使用任何它们觉得有用的调试或者测试工具了。
Status Information
master 运行了一个内置的 HTTP server 并且输出了一系列供人们使用的状态页。状态页会显示程序的计算过程,例如已经完成了多少个 task,还有多少个 task 正在执行,输入的字节数,中间数据的字节数,输出的字节数,以及处理速度等等。该页还包含了指向各个 task 的标准错误和标准输出链接。用户可以利用这些数据来判断计算会持续多长时间,以及计算是否需要添加更多的资源。这些页面还能用来发现什么时候处理速度比预期地下降好多。
另外,顶层的状态页显示了那些 worker 出错了,以及在它们出错时正在执行哪些 Map 和 Reduce task。这些信息在诊断用户代码出现的 bug 时是非常有用的。
MapReduce 库提供了一个叫 counter 的设施用于统计各种不同事件出现的次数。例如,用户可能想要统计已经处理过的单词的数目或者德国文件的索引数量。
为了使用这一特性,用户代码创建一个命名的 counter 对象,并且在 Map 以及 Reduce 函数中对 counter 进行增加。例如:
1 |
|
每个 worker 机器上 counter 的值会定期传给 master(捎带在给 master 的 ping 回复中)。master 将来自成功执行的 Map 和 Reduce task 的 counter 值聚集起来。然后在 MapReduce 操作完成之后返回给用户代码。当前的 counter 值也会显示在 master 的状态页上(前述的 state pages),所以用户能从实时观看计算的进行。在聚集 counter 的值的时候,master 会消除 Map 或者 Reduce task 的重复执行造成的重复计算。(重复执行可能由 backup tasks 或者因为错误重新执行的 task 引起)。
有些 counter 的值是由 MapReduce 库自动维护的,例如已经处理的输入键值对数目以及已经产生的输出键值对数目。
用户发现 counter 特性对于检查 MapReduce 操作的执行是非常有用的。例如,在有些 MapReduce 操作中,用户代码想要确保产生的输出对的数目和已经处理的输入对的数目是恰好相等的(比如检查满射),或者处理的德语文件的数目占总处理文件数目的比重在一个可容忍的范围内。
Performance
在这个 section 中,我们通过运行在一个集群上的两个 computation 来测试 MapReduce 的性能。一个 Computation 搜索一个 T 的数据,从中获取一个特定的模式。另一个 computation 对一个 T 的数据进行排序。
这两个程序代表了由用户实际编写的 MapReduce 程序的一个子集------一类程序用于将数据从一种表示方法切换到另一种表示方法。另一类程序则从大数据集中抽取出一小部分有趣的数据。
Cluster Configuration
所有程序都运行在一个由 1800 台机器组成的机器上。每一台机器都有两个 2GHz 的 Intel Xeon 处理器,并且允许 Hper-Threading(超线程), 4GB 内存,两个 160GB 的 IDE 磁盘,以及一个 G 比特的以太网链路。这些机器被安排在一个两层树状的交换网络中,根节点的带宽大概在 100-200Gbps。因为所有机器都在同一个托管设备中,因此任意两台机器间的 RTT 少于 1ms。
其中 4GB 中的 1-1.5G 是为集群中运行的其他任务预留的。程序在一个周末的下午运行,此时 CPU,磁盘,网络基本都处于空闲状态。
Grep
grep 程序需要扫描 10 的十次方条 100-byte 的记录,搜索一个相对罕见的三字符模式(出现了 92337 次)。输入被分成大概 64MB 份(M = 15000),所有的输出文件都存放在一个文件中(R = 1)。
Figure 2 显示了 Computation 随着时间的变化过程。Y 轴代表了输入数据的扫描速度。随着机器逐渐加入 MapReduce 的计算当中,速度越来越快,当有 1764 个 worker 加入时,达到峰值 30GB/s。随着 Map task 的结束,速度开始下降并且在 80s 的时候到达 0,。整个 Computation 从开始到结束总共花费了大概 150s。这其中还包括了 1 分钟的启动开销。开销主要来源于将程序分发到 worker machine 中,和 GFS 交互并打开 1000 个输入文件,以及获取局部性优化所需的信息的延时。
Sort
排序程序用于对 10 的十次方条记录(大概 1T 的数据)进行排序。程序以 TeraSort benchmark 为模型。
排序程序由不超过 50 行用户代码组成,一个三行的 Map function 从 text 的一行中提取一个 10-byte 的排序 key,与原始的 text line 组合成一个中间 key/value pair。我们使用内置的 Identity 函数作为 Reduce 的运算符。这一函数将中间键值对传递出作为输出对。最终的排序输出是一个二路复制的 GFS 文件。
输入被分成 64MB 份(M = 15000),而将输出分为 4000 份(R = 4000)。分割成许根据初始的 key 将其分割到 R 份中的一个。
图 3(a)展示了排序程序的正常执行,左上方的图表示读入的速率,在达到峰值 13GB/s 后迅速滑落因为所有的 map tasks 在 200 秒内就已经完成。值得注意的是,输入的速率慢于 grep 操作(对于相同的 M 划分),这是因为对于 sort 操作,花费了一半的事件以及 I/O 带宽用于将中间键值对结果写入本地磁盘,而 grep 操作对应的输出则可以小到忽略不计。
中间左边的图表示经历 map tasks 后通过网络向 reduce tasks 传输数据的速率。这一混排在首个完成的 map task 后启动。第一个突起表示所有 reduce tasks 运行的第一个批次(R = 1700 nearly all),开始计算后 300 秒左右,第一批次的部分 reduce tasks 完成,我们开始向完成的机器进一步递送剩余 reduce tasks 的数据。
左下的图表示排序好的数据向最终文件写出的速率。从第一批次 reduce tasks 完成到开始写数据有一段时间间隔,这是因为机器忙于对中间数据进行排序。
关于速度的比较,输入数据高于 shuffle 速率和输出速率,这是因为输入是基于本地存储,而又因为网络带宽的限制,以及输出要求两份 replica 的要求,shuffle 速率高于输出速率。我们写成两个副本,因为这是我们的底层文件系统提供的可靠性和可用性机制要求。 如果底层文件系统使用擦除编码(erasure coding)而不是复制(replication),则可以减少写入数据的网络带宽要求。
(GFS 介绍的论文里应该会解释为什么需要两份 replica)。
Effect of Backup Tasks
在图 3(b)中,我们展示了禁止 backup tasks 情况下执行排序操作的结果。流程与图 3(a)很相似,但存在一个相当长的且看不出有明显活动的尾部。960 秒后,除了剩余的 5 个,其余 reduce tasks 均已完成,然而剩余的 stragglers 直到 300 秒后才完成任务,着导致整体耗时 1283 秒,比具备 backup tasks(最终备份处理任务)情况下多耗时 44%。
Machine Failures
在图 3(c)中,我们展示了将 1746 台工作机器中的 200 台机器故意宕机几分钟以模拟机器故障情况下排序操作的执行结果,底层的集群立刻重启新的工作进程(因为仅仅是 kill 进程,实际上机器功能良好)。
worker 的 deaths 通过图表中负值输入速率来表示,因为先前一些已完成的 map work 丢失而需要被重新执行(根据先前分析,由于 map task 得到的中间结果存储在本地,宕机后无法正确访问,使得之前的任务需要被重新执行 re-execute)。重执行开始得十分迅速,整体耗时仅仅比正常情况多耗时 5%。
Experience
我们得 MapReduce 库首个版本于 2003 年 2 月写成,并在 2003 年 8 月进行了重要加强,包括引入局部优化,worker 执行任务间动态负载均衡等等。从那时起,我们非常欣喜得看到 MapReduce 在解决各类问题上的广泛应用。现在,它已 Google 用于以下广泛领域的研究。
- 大规模机器学习问题
- Google News 和 Froogle products(Google 购物)的聚类问题
- 提取用于生成热门查询报告的数据(如 Google Zeitgeiest)
- 提取网页上进行的试验或产品性能
- 大规模图形计算
Large-Scale Indexing
目前为止,我们最重要的 MapReduce 应用之一是重写一个产生谷歌搜索引擎需要的数据结构的复杂系统。索引系统以被抓取系统检索到的文件(GFS 文件形式储存)为输入,raw content 大小约 20T,索引进程进行约 10 次 MapReduce 组成的序列操作。相较于先前 ad-hoc 分布式索引系统,现在应用 MapReduce 后,系统具备以下优点:
- 因为与容错、分布式、并行化相关内容隐藏在库重,索引代码更加简单、精巧、易于理解。比如,计算的一个阶段从原有 3800 行 C++代码削减至 700 行。
- 概念上可与计算分开,从而使改动变得简单。
- 内部对一些机器故障的解决使得整个过程更容易成功执行。进一步的,也更容易向系统中加入新的机器。
Related Work
Conclusion
现在 MapReduce 已成功被 Google 应用于各种目的,我们将这种成功归功于以下原因。
- 甚至对于并行和分布式系统缺乏相关经验的编程人员,由于相关细节隐藏在库中,模型仍具备易用性。
- 大量问题易于以 MapReduce 地方式解决。
- 我们将其实现在大规模集群上,因而适于很多大型问题。
在这项工作中我们学习到很多,
- 重新定义编程范式使得并行/分布式运算易于实现,也获得了相当的容错性能。
- 网络带宽作为稀缺资源,使得我们的很多优化都意在减少通过网络传输的数据。
- 冗余的任务执行(backup tasks)可以用于减少缓慢机器的影响,以及解决机器故障和数据丢失。
本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!