vlambda博客
学习文章列表

【MIT 6.824】MapReduce 需求分析及实现

【前言】

做这个系列的 lab 初衷是利用现阶段所掌握的工程知识(Crtl+CV)去解决一些业务无关的问题,顺便能够学习 Goland,提高 debug 能力,个人觉得 learn by doing 是最有效的方式。作为非科班,如果你也苦于没有好的项目能够练手,那么一起加入我们吧,切记,此系列 lab 并不是让大家一口吃个胖子妄想精通分布式理论,个人主要是用于提高 coding 能力,包括形成良好的代码风格,了解并使用语言特性,学习架构思想及拓展知识面,也希望大家可以一起交流但不限于 code review,分享心得,共勉。

【论文梗概】
在分析论文前,先简单谈谈分布式系统。单机能处理的事务数受限其物理规格,试想如果想在合理的时间内处理海量数据,那么只能横向/纵向扩展运算实例,那么就会引来一些相当复杂的分布式问题,下面简单罗列:
  1.  系统中存在很多部分在并发执行,你会遇到并发编程和各种复杂交互所带来的问题,以及时间依赖的问题(比如同步、异步)。

  2. 分布式系统有多个组成部分,加上计算机网络,你会遇到一些意想不到的故障,比如有些部分宕机有部分网络不通等局部故障。

  3. 人们设计分布式系统的根本原因是为了获得更高的性能,但是如何让系统达到预期性能也是个很棘手的问题,诸如网络带宽限制,消息队列性能瓶颈等。

那么如何解决这些问题,需要我们全面学习本系列课程,先打下基础,再深入分布式理论。
好了现在开始论文部分。 MapReduce 是一个编程模型,主要用于处理和生成超大数据集,由 Google 设计、开发和使用的一个系统。Google 当时面临的问题是,他们需要在 TB 级别的数据上进行大量计算,比如为所有的网页创建索引,分析整个互联网的链路并得出最重要或者最权威的网页(不得不说是需求推动技术发展阿),Google 希望能对大量数据的大量运算并行跑在几千台计算机上,这样才能快速完成计算。为了降低分布式开发的门槛,能让普通开发者也能像在单机上开发分布式应用,Google 需要一种框架,可以让它的工程师能够进行任意的数据分析,比如排序,网络索引器以及任何运算,工程师只需要实现应用程序的核心,就能将应用运行在数千台计算机上,而不用考虑如何将运算工作分发到数千台计算机,如何组织这些计算机,如何移动数据,如何处理故障等细节,这就是 MapReduce 出现的背景。
MapReduce 的思想是,应用程序设计人员和分布式运算的使用者,只需要写简单的 Map 函数和 Reduce 函数,而不需要知道任何有关分布式的事情, MapReduce 框架会处理剩下的事情。
可以看下 MapReduce 架构图:

  • Input files:海量数据被分割成单机能够接受的合理大小

  • Worker:Worker 是个抽象的概念,他不仅能执行 Map 任务,还能执行 Reduce 任务,取决于 Master 分配的任务类型。

  • Map phase:从某个地方读取指定文件,并运行 Map 函数生成中间文件

  • Intermediate files:Map 任务产生的中间文件,供 Reduce Worker 做输入

  • Reduce phase:从 Map Worker 所在实例的磁盘中读取中间文件,并执行 Reduce 函数输出结果

  • Master:负责分配任务至空闲 worker,并做错误恢复,任务编排等操作,其在整个 MapReduce 模型中有着至关重要的作用。


下面着重分析 Master 所需拥有的能力

  1. Master就像一个数据管道,中间文件存储区域的位置信息通过这个管道从Map传递到Reduce。因此,对于每个已经完成的Map任务,master存储了Map任务产生的R个中间文件存储区域的大小和位置。当Map任务完成时,Master接收到位置和大小的更新信息,这些信息被逐步递增地推送给那些正在工作的Reduce任务。

  2. Master 还需要很好处理机器故障,提高系统容错性(Fault Tolerance),包括但不限于 worker failure、master failure 及 semantics in the presence of failures(这里不知道怎么翻译比较准确,个人理解是保证整个Map到Reduce过程由于某种原因产生的错误结果不会改变原有语义)。

  3. MapReduce的master在调度Map任务时会考虑输入文件的位置信息,尽量将一个Map任务调度在包含相关输入数据拷贝的机器上执行;如果上述努力失败了,master将尝试在保存有输入数据拷贝的机器附近的机器上执行Map任务(例如,分配到一个和包含输入数据的机器在一个switch里的worker机器上执行)。当在一个足够大的cluster集群上运行大型MapReduce操作的时候,大部分的输入数据都能从本地机器读取,因此消耗非常少的网络带宽。

  4. Task Granularity.个人理解是将任务切割成合理粒度,并通过算法提高集群负载均衡及故障恢复能力。

  5. Backup Tasks.并行运算中,影响总执行时间最通常的因素是“落伍者”,master 通过调度备用任务进程来执行剩下的处于处理中(in-progress)状态的任务。

在此 lab 具体实现的时候,是以论文精简后的内容作为需求输入,所以会在数据结构上出现一点差异。论文内容很多,这里也不再赘述其中的细节,有需要的读者可以自行阅读原作,多读几遍其意自现。

【LAB 关键实现】
不涉及具体代码,只做分析。代码实现有很多种,这里只提几个需要注意的点,剩下由读者独立完成。
  1. 环境准备。最好是 linux/unix 系统,不然代码中有的地方要适配 windows。安装 git, goland,按照 go mod 要求的版本即可。下载 goland 安装包的时候注意下系统架构是 arm 还是其他,下载对应的包别错了(没错我就没仔细看装了好久)。下载完后可以去指定目录执行如下命令查看能否有正常输出:

# 6.824/src/maingo build -race -buildmode=plugin ../mrapps/wc.go$ rm mr-out*$ go run -race mrsequential.go wc.so pg*.txt$ more mr-out-0A 509ABOUT 2ACT 8...

2. 一切准备就绪后,我们就可以开始造代码了。首先原始代码已经包含了整体架构,我们只需要实现其中未完成的主要功能即可。整个程序的主入口在main/mrcoordinator.go 和 main/mrworker.go 两个地方,然后需要我们实现 mr/coordinator.go, mr/worker.go 和 mr/rpc.go 三个地方。可以看到程序开始会创建一个 coordinator(即精简版Master),以系统标准输入作为入参,其实是指定了一串文件名,这里的文件名其实是已经分割好了直接给 Map Worker 使用的,不用再自行实现分割逻辑,这点是 lab 相对于论文的简化。

The pg*.txt arguments to mrcoordinator.go are the input files; each file corresponds to one "split", and is the input to one Map task.

https://pdos.csail.mit.edu/6.824/labs/lab-mr.html
创建 coordinator 需要知道其需要哪些功能:
  1. 需要存储待分配任务,goland 中 channel 是个不错的选择,线程安全。

  2. 分配任务时需要注意所有 Map 完成后,才可以开始分配 Reduce 任务,因为对于 Reduce Worker 而言,需要稳定的输入。那么 coordinator 就必须要有字段去记录当前 Map 任务的完成状态,以及需要一个信号去通知何时开始分发 Reduce Task.

  3. 针对每个 Worker 都需要一个唯一标识,这个 id 由 coordinator 统一发布,所以需要维护一个自增字段。

  4. 记录任务的完成情况,可根据此状态来决定是否退出主程序。

  5. 记录哪些任务是正在进行的,需要对其进行健康检查,如果异常,需要将任务重放至待分配任务队列。

  6. 需要分别建立 task,worker 的索引,方便根据 id 检索对象。

  7. 针对共享变量,为避免竞争需要一把锁。

然后可以选择合适的数据结构去表示这些状态,因人而异,但要注重代码的封装及抽象(我的代码现在太烂了..
当 coordinator 创建出来后,可以考虑先从 Worker 向 coordinator 请求下发任务开始下手。作为 Worker,使用 RPC 与 coordinator 进行通信,这一步源码已经实现,现在需要我们实现的就是“申请-接收”这两步。首先需要明确 Worker 完成一个任务后还能继续向 coordinator 申请任务的,只有在故障后才会退出。那么我们就需要不断轮询向 coordinator 发送 RPC 请求,所以我们得带上自己的唯一性 id 去索要任务。作为 coordinator 会根据这个 id 去检查 Worker状态及待分配任务来决定是否下发任务,并且下发任务的时候得带上任务类型,然后 Worker 根据任务类型执行对应的操作。
然后就可以开始考虑拿到任务的 Worker 该如何操作了,这个时候就用到了我们的毕生所学(Ctrl+CV),借鉴 mr/sequential.go 中的实现,并稍加改造。

The worker should put intermediate Map output in files in the current directory, where your worker can later read them as input to Reduce tasks.

https://pdos.csail.mit.edu/6.824/labs/lab-mr.html
由课程规则可知,Map Worker 产生的文件可直接被 Reduce Worker 读取,这点与论文不同,论文是将文件存在 Map Worker 本地磁盘中,如果因为某种原因到 Worker 的网络不可达,那么该 Worker 完成的任务得全部重新分配,实验做了简化,假设存储的文件肯定能拿到。为了避免由于 Worker 宕机等故障,产生的残留数据影响 Reduce Input,可以采用将 Map Worker 产生的输出,写到临时文件中,然后同一由 coordinator 利用操作系统原子操作 os.rename 来保证任务提交的原子性。我采用了一种更为直接的方法,对于相同的输入,Worker 的输出文件名是固定的,所以我们可以直接覆盖式更新文件内容,只要 coordinator 觉得此次任务提交时合法的就行,其实这种做法强依赖于 Worker 内部逻辑没有bug,也就是整个过程是幂等的,如果说有两个 Worker 拿到了相同的任务,他们之间完成时间有先后之分,必然会出现后者覆盖前者的情况,不管后面的任务最后提交时是否合法,此处可以优化成临时文件的实现方式,也是论文推荐的工程上常用的技巧,如修改重要配置文件等操作为了保证原子性,可以先修改副本再改名。Worker 提交任务后,会接受一个状态,比如 coordinator 认为你已经发生故障了,该退出了或者当前没有待分配任务了需要再等会。
最后再说说 coordinator,在接收到 Worker 提交的任务后,检查一下 Worker 的状态是否合法,决定是否采用接收该提交(如果使用我刚才说的覆盖式更新,那么采不采用都无所谓),是否需要 Worker 变更状态等。剩下的就是起几个协程去检查各队列的状态了。
按照上述分析实现的代码,能够通过 test-mr.sh 的全部用例。哦对了,这个测试脚本要求 bash 的版本大于 4.3,才能支持 wait -n 参数,要么就把 -n 删了,这个坑了我好久。。

【结语】
以上编码用了一天,debug 用了一天,看论文和课程用了好几天,中间反反复复断断续续,还剩两个 challenge exercises 没有做,分别是实现分布式 Grep 和使用TCP/IP协议和共享文件系统在多台机器上实现 MapReduce,以后有空再写吧(优先级较低)。下一步计划开始看 Raft ,相对而言 MapReduce 应该是比较简单的 lab 了。



【参考资料】
https://pdos.csail.mit.edu/6.824/labs/lab-mr.html
https://zhuanlan.zhihu.com/p/141657364
https://zhuanlan.zhihu.com/p/260752052
https://zhuanlan.zhihu.com/p/337310144