vlambda博客
学习文章列表

实现一个MapReduce框架

1 目的

本文会带领大家使用GO语言,实现一个MapReduce框架。该框架能够实现MapReduce的基本功能,能运行基本的MapReduce应用,如WordCount等。

2 整体架构

3 细节

3.1 common.go

在这里定义了公用的结构和函数。

type Task struct{
 FileName string //文件名称,在Map阶段是输入文件的路径。在Reduce阶段,是中间文件的路径
 NReduce int //代表Reduce数量
 NMap int //代表Map数量
 Seq int  //任务id
 Phase TaskPhase //Map or reduce
 Alive bool //任务执行阶段,共两种阶段:Map,Reduce
}
//根据map的序号和reduce序号生成中间文件名称
func reduceName(mapIdx,reduceIdx int)string{
 return fmt.Sprintf("mr-%d-%d",mapIdx,reduceIdx)
}
//根据reduce的序号生成最终输出文件
func outputName(reduceIdx int)string{
 return fmt.Sprintf("mr-out-%d",reduceIdx)
}


3.2 rpc.go

MapReduce框架中使用RPC来实现Coordinator(Master)和Worker的通信,rpc.go中定义了rpc通信传输的数据结构以及函数。

//worker通过rpc向coordinator进行注册的时候发送的请求参数,它是一个空的结构,因为注册时,worker不需要传递任何参数给coordinator。
type RegisterArgs struct {
}
//worker注册时的返回参数
type RegisterReply struct {
WorkerID int //workerID 该worker的id
}

// worker向coordinator请求任务时的请求参数
type TaskArgs struct{
WorkerID int //workerID 该worker的id
}
// worker向coordinator请求任务时的返回参数
type TaskReply struct{
Task *Task //Task 请求的task的Task结构
}

//worker向coordinator报告任务执行情况时的请求参数结构
type ReportTaskArgs struct{
   Done bool //表示任务是否执行完成
Seq int // 任务的id
Phase TaskPhase // 任务的状态 Map还是Reduce
WorkerID int// worker的id
}
//worker向coordinator报告任务执行情况时的返回参数结构,为空
type ReportTaskReply struct {}

//生成rpc名称
func coordinatorSock() string {
s := "/var/tmp/824-mr-"
s += strconv.Itoa(os.Getuid())
return s
}


3.3 coordinator.go

实现一个MapReduce框架


3.4 worker.go

3.5 mrcoordinator.go

负责调用mrcoordinator.go中的MakeCoordinator函数,来开启coordinator程序。

3.6 mrworker.go

负责调用worker.go中的Worker函数,开启worker程序。


4 运行

  • 1 编译wc.go

    先将mapreduce应用编译成功

    go build -race -buildmode=plugin ../mrapps/wc.go
  • 2 清除已有的中间文件,以及结果文件

    rm mr-out*
  • 3 运行mrcoordinator.go

    go run -race mrcoordinator.go pg-*.txt
  • 4 运行mrworker.go

    go run -race mrworker.go wc.so

    结果如图所示

注:需要先安装go环境

5 完整代码获取