0%

MapReduce 总结

本文是博主学习 MIT6.824 课程的学习笔记,其中会总结论文知识点并加入自己的理解,内容可能与论文原文有出入,想要了解细节的读者可以阅读论文原文或者学习 MIT6.824课程。

MapReduce: Simplified Data Processing on Large Clusters

Introduction And MapReduce

简介

MapReduce 是一种编程模型,也是一个处理和生成超大数据集算法模型的相关实现。使用 MapReduce 架构的程序能够在大量的普通配置的计算机上实现并行化处理。

MapReduce 模型

MapReduce 编程模型的处理过程为:输入一个 key/value pair 集合,经过处理后,输出一个 key/value pair 集合作为结果。
MapReduce 允许用户使用两个函数 MapReduce 来表达上述计算。

  • Map 函数接受一个输入的 key/value pair 值,然后产生一个中间 key/value pair 值的集合。MapReduce 把所有 keyI 的中间值 value集合在一起后传递给 reduce 函数。
  • Reduce 函数接受一个中间 key 的值 I 和其value值的集合,由于 value 值可能由于太大无法放入内存中,故通常我们把 value 的迭代器传递给 Reduce 函数。

上述过程也可以抽象为下面的表达式

$$ map(k1, v1) -> list(k2, v2) $$

$$ reduce(k2, list(v2)) -> list(v2) $$

一个例子

下面的例子可以简要说明 MapReduce 模型的计算过程,当我们要计算一个很大的文档集合中每个单词出现的次数时,可以用下面的方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
map(String key, String value):
// key: document name
// value: document contents
for each word w in value:
EmitIntermediate(w, "1")

reduce(String key, Iterator values):
// key: a word
// values: a list of counts
int result = 0;
for eash v in values:
result += ParseInt(v)
Emit(AsString(result))

Map 函数输出文档中的每个词、以及这个词的出现次数(在这个简单的例子里就是 1)。Reduce 函数把 Map 函数产生的每一个特定的词出现的次数累加起来。

实现

执行过程

执行过程概览

图 1 展示了 MapReduce 的计算流程。当用户调用 MapReduce 函数时,将发生下面的一 系列动作(下面的序号和图 1 中的序号一一对应):

  1. 用户程序首先调用 MapReduce 库将输入文件分成 M 个数据片度,每个数据片段的大小一般从 16MB 到 64MB(可以通过可选的参数来控制每个数据片段的大小)。然后用户程序在机群中创建大量的程序副本。

  2. 这些程序副本中的有一个特殊的程序–master。副本中其它的程序都是 worker 程序,由 master 分配 任务。有 M 个 Map 任务R 个 Reduce 任务将被分配,master 将一个 Map 任务或 Reduce 任务分配给一个空闲的 worker

  3. 被分配了 Map 任务的 worker 程序读取相关的输入数据片段,从输入的数据片段中解析出 key/value pair,然后把 key/value pair 传递给用户自定义的 Map 函数,由 Map 函数生成并输出的中间 key/value pair,并缓存在内存中。

  4. 缓存中的 key/value pair 通过分区函数分成 R 个区域,之后周期性的写入到本地磁盘上。缓存的 key/value pair 在本地磁盘上的存储位置将被回传给 master,由 master 负责把这些存储位置再传送给 Reduce worker

  5. Reduce worker 程序接收到 master 程序发来的数据存储位置信息后,使用 RPCMap worker 所在主机的磁盘上读取这些缓存数据。当 Reduce worker 读取了所有的中间数据后,通过对 key 进行排序,使得具有相同 key 值的数据聚合在一起。由于许多不同的 key 值会映射到相同的 Reduce 任务上, 因此必须进行排序。如果中间数据太大无法在内存中完成排序,那么就要在外部进行排序。

  6. Reduce worker 程序遍历排序后的中间数据,对于每一个唯一的中间 key 值,Reduce worker 程序将这 个 key 值和它相关的中间 value 值的集合传递给用户自定义的 Reduce 函数。Reduce 函数的输出被追加到所属分区的输出文件。

  7. 当所有的 MapReduce 任务都完成之后,master 唤醒用户程序。在这个时候,在用户程序里的对 MapReduce 调用才返回。

在成功完成任务之后MapReduce 的输出存放在 R 个输出文件中(对应每个 Reduce 任务产生一个输出文件,文件名由用户指定)。一般情况下,用户不需要将这 R 个输出文件合并成一个文件–他们经常把这些文 件作为另外一个 MapReduce 的输入,或者在另外一个可以处理多个分割文件的分布式应用中使用。

在上述执行过程中,master 会记录每一个 MapReduce 任务的当前完成状态,以及所分配的 worker。除此之外,Mmster 还负责将 Map 产生的中间结果文件的位置和大小转发给 Reduce

容错机制

worker 失效

master 会周期性的 ping worker,如果 worker 没有及时返回信息,则 master会将其标记为失效。对于在该 worker上执行的任务,分为以下几种情况:

  1. 已经完成的 Map 任务:由于 Map 任务的输出已经无法被访问,故该任务会被重置为空闲状态,后续将被安排给其他的 worker重新执行,并且重新执行的动作会通知给所有执行 Reduceworker。任何还没有从失效 worker上读取数据的 Reduce 任务后续将从新的 worker 上读取。

  2. 已经完成的 Reduce 任务:由于Reduce 的输出结果会存储在全局文件系统上,故无需再次执行。

    该设定是基于 Google 的使用场景,Google 会将 Reduce 任务的输出结果放在 GFS 上,故无需担心数据丢失问题,如果将结果放在本地文件,则仍需要重新执行 Reduce 任务。

  3. 正在运行的 MapReduce 任务:将会被重置为空闲状态,等待重新调度。

master 失效

master 会周期性的将其维护的数据结构(点击跳转) 写入磁盘(即检查点 checkpoint),当前 master 失效时,将会重新启动一个 master 并从检查点记录的状态继续执行。

在 Google 内部的实现中,如果 master 失效,就中止 MapReduce 运算,由人工干预恢复 master 状态。

对于上述机制的讨论

当用户定义的 Map-Reduce 函数都是确定性函数的时候,相同的输入具有相同的输出,所以重复执行具有相同的结果,上述容错机制最终都可以得到正确的结果。

但当用户定义的 Map-Reduce 函数非确定性函数时,两个 Reduce 可能会输入来自两个不同 Map(但是 Map 函数的输入是相同的) 的结果,此时 Reduce 函数的输出可能会不同。

优化技巧

存储与计算尽量在同一个节点

为了减少数据拉取带来的网络开销,在 Google 的使用场景下,会结合 GFS 的存储,尽可能的将 Map 任务调度到文件存储所在的,或者相邻的服务器上。

增加任务数以平衡负载

前面提到,在计算过程将 Map 拆成 M 个,Reduce 拆成 R 个来执行,理想情况下 MR 应当远远大于 worker 的数量,这样可以让每个 worker 都可以执行大量不同的任务来实现动态的负载均衡,也可以在某个 worker 生效,任务转移到其他 worker 时不会造成热点。

但实际上需要考虑以下因素来限制 MR 的值:

  1. master 节点需要执行 $O(M+R)$ 次调度,并且需要在内存中保存 $O(M*R)$ 个状态
  2. R 值需要用户指定,因为它是最终结果的文件数目
  3. M 值的大小决定了每个 Map 输入数据的数据量,通常我们向让每个输入数据在 16MB~64MB 之间

基于以上的原因,在 Google 的场景下,他们的 M Rworker 数量的比例一般为:$M=200000, R=5000, worker=2000$

处理落后任务

Map-Reduce 这样的分布式计算框架,最终受到木桶效应 的影响会很大,如果有个 worker由于 CPU 内存 磁盘 等因素的影响,导致执行速度缓慢,就会影响整个计算任务的进度。

为了解决上述问题,在 Map-Reduce 操作接近完成的时候,master会调度备用任务(backup)进程来执行剩下的,处于处理中(in-progress)的任务,当备用任务处理完成或者是原始任务处理完成,都会将这个任务标记为完成。

自定义分区函数

Map 任务执行完成后,Map-Reduce 框架会对Map 产生的 key 使用分区函数进行分区,以保证相同的 key 能够被分配到相同的 Reduce 任务,通常来说分区函数使用 hash 方法,比如 $hash(key) mod R$。但是对于某些场景,可能会有其他的分区需求,比如我们对 URL 进行统计时,如果我们想把相同域名的 URL 分到同一个 Reduce 任务,这时需要自定义分区函数。

Combiner 函数

Map 任务结束后,可能存在大量的重复 keykey/value pair,比如上述统计单词个数的任务,Map 结束后可能会出现大量 the <-> 1 这种 key/value pair。所有这些 key/value pair 都得通过网络发送给 Reduce 并由 Reduce 任务作累加。

为了减小网络传输和 Reduce 任务的工作量, Map-Reduce 框架支持用户指定一个 Combiner 函数在 Map 任务执行结束后,对 Map 任务的结果先做一次预处理,以将相同的 key 值合并在一起。