【MIT 6.824】MapReduce 需求分析及实现
【前言】
【论文梗概】
系统中存在很多部分在并发执行,你会遇到并发编程和各种复杂交互所带来的问题,以及时间依赖的问题(比如同步、异步)。
分布式系统有多个组成部分,加上计算机网络,你会遇到一些意想不到的故障,比如有些部分宕机有部分网络不通等局部故障。
人们设计分布式系统的根本原因是为了获得更高的性能,但是如何让系统达到预期性能也是个很棘手的问题,诸如网络带宽限制,消息队列性能瓶颈等。
Input files:海量数据被分割成单机能够接受的合理大小
Worker:Worker 是个抽象的概念,他不仅能执行 Map 任务,还能执行 Reduce 任务,取决于 Master 分配的任务类型。
Map phase:从某个地方读取指定文件,并运行 Map 函数生成中间文件
Intermediate files:Map 任务产生的中间文件,供 Reduce Worker 做输入
Reduce phase:从 Map Worker 所在实例的磁盘中读取中间文件,并执行 Reduce 函数输出结果
Master:负责分配任务至空闲 worker,并做错误恢复,任务编排等操作,其在整个 MapReduce 模型中有着至关重要的作用。
下面着重分析 Master 所需拥有的能力
Master就像一个数据管道,中间文件存储区域的位置信息通过这个管道从Map传递到Reduce。因此,对于每个已经完成的Map任务,master存储了Map任务产生的R个中间文件存储区域的大小和位置。当Map任务完成时,Master接收到位置和大小的更新信息,这些信息被逐步递增地推送给那些正在工作的Reduce任务。
Master 还需要很好地处理机器故障,提高系统容错性(Fault Tolerance),包括但不限于 worker failure、master failure 及 semantics in the presence of failures(这里不知道怎么翻译比较准确,个人理解是保证整个Map到Reduce过程由于某种原因产生的错误结果不会改变原有语义)。
MapReduce的master在调度Map任务时会考虑输入文件的位置信息,尽量将一个Map任务调度在包含相关输入数据拷贝的机器上执行;如果上述努力失败了,master将尝试在保存有输入数据拷贝的机器附近的机器上执行Map任务(例如,分配到一个和包含输入数据的机器在一个switch里的worker机器上执行)。当在一个足够大的cluster集群上运行大型MapReduce操作的时候,大部分的输入数据都能从本地机器读取,因此消耗非常少的网络带宽。
Task Granularity.个人理解是将任务切割成合理粒度,并通过算法提高集群负载均衡及故障恢复能力。
Backup Tasks.并行运算中,影响总执行时间最通常的因素是“落伍者”,master 通过调度备用任务进程来执行剩下的处于处理中(in-progress)状态的任务。
【LAB 关键实现】
环境准备。最好是 linux/unix 系统,不然代码中有的地方要适配 windows。安装 git, goland,按照 go mod 要求的版本即可。下载 goland 安装包的时候注意下系统架构是 arm 还是其他,下载对应的包别错了(没错我就没仔细看装了好久)。下载完后可以去指定目录执行如下命令查看能否有正常输出:
# 6.824/src/main
go build -race -buildmode=plugin ../mrapps/wc.go
rm mr-out*
go run -race mrsequential.go wc.so pg*.txt
more mr-out-0
A 509
ABOUT 2
ACT 8
...
2. 一切准备就绪后,我们就可以开始造代码了。首先原始代码已经包含了整体架构,我们只需要实现其中未完成的主要功能即可。整个程序的主入口在main/mrcoordinator.go 和 main/mrworker.go 两个地方,然后需要我们实现 mr/coordinator.go, mr/worker.go 和 mr/rpc.go 三个地方。可以看到程序开始会创建一个 coordinator(即精简版Master),以系统标准输入作为入参,其实是指定了一串文件名,这里的文件名其实是已经分割好了直接给 Map Worker 使用的,不用再自行实现分割逻辑,这点是 lab 相对于论文的简化。
The pg*.txt arguments to mrcoordinator.go are the input files; each file corresponds to one "split", and is the input to one Map task.
https://pdos.csail.mit.edu/6.824/labs/lab-mr.html
需要存储待分配任务,goland 中 channel 是个不错的选择,线程安全。
分配任务时需要注意所有 Map 完成后,才可以开始分配 Reduce 任务,因为对于 Reduce Worker 而言,需要稳定的输入。那么 coordinator 就必须要有字段去记录当前 Map 任务的完成状态,以及需要一个信号去通知何时开始分发 Reduce Task.
针对每个 Worker 都需要一个唯一标识,这个 id 由 coordinator 统一发布,所以需要维护一个自增字段。
记录任务的完成情况,可根据此状态来决定是否退出主程序。
记录哪些任务是正在进行的,需要对其进行健康检查,如果异常,需要将任务重放至待分配任务队列。
需要分别建立 task,worker 的索引,方便根据 id 检索对象。
针对共享变量,为避免竞争需要一把锁。
The worker should put intermediate Map output in files in the current directory, where your worker can later read them as input to Reduce tasks.
https://pdos.csail.mit.edu/6.824/labs/lab-mr.html
【结语】
【参考资料】
https://pdos.csail.mit.edu/6.824/labs/lab-mr.html
https://zhuanlan.zhihu.com/p/141657364
https://zhuanlan.zhihu.com/p/260752052
https://zhuanlan.zhihu.com/p/337310144