实现一个MapReduce框架
1 目的
本文会带领大家使用GO语言,实现一个MapReduce框架。该框架能够实现MapReduce的基本功能,能运行基本的MapReduce应用,如WordCount等。
2 整体架构
3 细节
3.1 common.go
在这里定义了公用的结构和函数。
type Task struct{
FileName string //文件名称,在Map阶段是输入文件的路径。在Reduce阶段,是中间文件的路径
NReduce int //代表Reduce数量
NMap int //代表Map数量
Seq int //任务id
Phase TaskPhase //Map or reduce
Alive bool //任务执行阶段,共两种阶段:Map,Reduce
}
//根据map的序号和reduce序号生成中间文件名称
func reduceName(mapIdx,reduceIdx int)string{
return fmt.Sprintf("mr-%d-%d",mapIdx,reduceIdx)
}
//根据reduce的序号生成最终输出文件
func outputName(reduceIdx int)string{
return fmt.Sprintf("mr-out-%d",reduceIdx)
}
3.2 rpc.go
MapReduce框架中使用RPC来实现Coordinator(Master)和Worker的通信,rpc.go中定义了rpc通信传输的数据结构以及函数。
//worker通过rpc向coordinator进行注册的时候发送的请求参数,它是一个空的结构,因为注册时,worker不需要传递任何参数给coordinator。
type RegisterArgs struct {
}
//worker注册时的返回参数
type RegisterReply struct {
WorkerID int //workerID 该worker的id
}
// worker向coordinator请求任务时的请求参数
type TaskArgs struct{
WorkerID int //workerID 该worker的id
}
// worker向coordinator请求任务时的返回参数
type TaskReply struct{
Task *Task //Task 请求的task的Task结构
}
//worker向coordinator报告任务执行情况时的请求参数结构
type ReportTaskArgs struct{
Done bool //表示任务是否执行完成
Seq int // 任务的id
Phase TaskPhase // 任务的状态 Map还是Reduce
WorkerID int// worker的id
}
//worker向coordinator报告任务执行情况时的返回参数结构,为空
type ReportTaskReply struct {}
//生成rpc名称
func coordinatorSock() string {
s := "/var/tmp/824-mr-"
s += strconv.Itoa(os.Getuid())
return s
}
3.3 coordinator.go
3.4 worker.go
3.5 mrcoordinator.go
负责调用mrcoordinator.go中的MakeCoordinator函数,来开启coordinator程序。
3.6 mrworker.go
负责调用worker.go中的Worker函数,开启worker程序。
4 运行
1 编译wc.go
先将mapreduce应用编译成功
go build -race -buildmode=plugin ../mrapps/wc.go
2 清除已有的中间文件,以及结果文件
rm mr-out*
3 运行mrcoordinator.go
go run -race mrcoordinator.go pg-*.txt
4 运行mrworker.go
go run -race mrworker.go wc.so
结果如图所示
注:需要先安装go环境