vlambda博客
学习文章列表

分布式计算引擎MapReduce基础

MapReduce的定义

  • 源于Google的MapReduce论文:发表于2004年12月,Hadoop MapReduce是Google Mapreduce克隆版

  • MapReduce特点:易于编程、良好的扩展性、高容错性、适合PB级以上海量数据的离线处理

MapReduce应用场景

  • 简单数据统计,比如网站pv、uv统计

  • 搜索引擎建索引

  • 海量数据查找(Hive中的sql会转换为MapReduce进行查找)

  • 复杂数据分析算法实现

    • 聚类算法

    • 分类算法

    • 推荐算法

    • 图算法

MapReduce的特色--不擅长处理的场景

  • 实时计算:像MySQL一样,在毫秒或秒级内返回结果

  • 流式计算:MapReduce的输入数据集是静态的,不能动态变化;MapReduce的自身设计特点决定了数据源必须是静态的

  • DAG计算(有向无环图):多个应用程序存在依赖关系,后一个应用程序的输入为前一个的输出

MapReduce的实例--WordCount

  • 场景:大量文件,里面存储了单词,且一个单词占一行

  • 任务:如何统计每个单词出现的次数?

  • 类似应用场景:

    • 搜索引擎中,统计最流行的K个搜索词

    • 统计搜索词频率,帮助优化搜索词提示

  • 随着文件的大小不断增大

    • Case 1:整个文件可以加载到内存中:sort datafile|uniq -c

    • Case 2:文件太大不能加载到内存中:<word,count>映射可以放到内存中

    • Case 3:文件太大不能加载到内存中:<word,count>映射也不行

  • 将问题泛化为:有一批文件(规模为TB或PB级),如何统计这些文件中所有单词出现的次数

    • 方案1:用一个单独的程序去读取和统计文件中所有单词出现次数

    • 方案2:启动多个程序分别读取不同文件中的单词出现的次数,然后将这些结果合并

MapReduce Wordcount数据流向

  • Input:一系列key-value对

  • 用户提供两个函数实现

    • Shuffling 混淆(按key进行hash%Reduce个数,分配到每个Reduce中)

    • Reduceing 合并统计&排序

    • Splitting 分片

    • Mapping 遍历

    • map(k,v) -> k1,v1(是中间key/value结果对)

    • reduce(k1, list(v1)) -> k2,v2

  • Output:一系列(k2,v2)对

分布式计算引擎MapReduce基础

map(key, value):
//key: line offset; value: line
for each word w in value:
emit(w,1)
reduce(key, values):
//key: a word; values: an iterator over counts
result = 0
for each count v in values:
result += v
emit(key,result)

MapReduce将作业的整个运行过程分为两个阶段

  • Map阶段由一定数量的Map Task组成

    • 输入数据格式解析:InputFormat

    • 输入数据处理:Mapper

    • 数据分组:Partitioner

  • Reduce阶段由一定数量的Reduce Task组成

    • 数据远程拷贝

    • 数据按照key排序

    • 数据处理:Reduce

    • 数据输出格式:OutputFormat

分布式计算引擎MapReduce基础

MapReduce编程模型--InputFormat

  • 文件分片(InputSplit)方法

    • 处理跨行问题

  • 将分片数据解析成key/value对

    • 默认实现是TextInputFormat

  • TextInputFormat

    • key是行在文件中的偏移量,value是行内容

    • 若行被截断,则读取下一个block的前几个字符

MapReduce编程模型--Split与Block

  • Block

    • HDFS中最小的数据存储单元

    • 默认是128MB

  • Split

    • MapReduce中最小的计算单元

    • 默认与Block一一对应

  • Split与Block

    • Split与Block的关系是任意的,由用户控制

MapReduce编程模型--Combiner

  • Combiner可看做local reduce:合并相同的key对应的value;通常与Reduce逻辑一样

  • 好处:减少Map Task输出数据量(磁盘IO);减少Reduce-Map网络传输数据量(网络IO)

  • 如何正确使用:结果可叠加,Sum(YES!) Average(NO!)

分布式计算引擎MapReduce基础

MapReduce编程模型--Partitioner

  • Partitioner决定了Map Task输出的每条数据交个哪个Reduce Task处理

  • 默认实现:hash(key) mod R

    • R是Reduce Task数目

    • 允许用户自定义,很多情况需要自定义Partitioner:比如“hash(hostname(URL)) mod R” 确保相同域名的网页交给同一个Reduce Task处理

MapReduce编程模型--实战

  • 将下面代码打包放到hdfs中,执行 hadoop jar命令执行

hadoop jar /home/bigdata/mapreduce/wordcount-0.1.0-SNAPSHOT.jar com.demo.WordCount /user/bigdata/wordcount_input /user/bigdata/output
package com.demo;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.GenericWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.StringTokenizer;

public class WordCount {
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}

public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}

public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
System.out.println("Uasge: wordcount <int> [<in>...] <out>");
System.exit(2);
}
Job job = new Job(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
for (int i = 0; i < otherArgs.length - 1; i++) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}

FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

MapReduce架构

分布式计算引擎MapReduce基础

  • Client

    • 与MapReduce1.0的Client类似,用户通过Client与YARN交互,提交MapReduce作业,查询作业运行状态,管理作业等。

  • MRAppMaster

    • 功能类似于1.0的JobTracker,但不负责资源管理

    • 功能包括:任务划分、资源申请并将之二次分配给Map Task和Reduce Task、任务状态监控和容错。

分布式计算引擎MapReduce基础

  • MRAppMaster容错性

    • 一旦运行失败,由AYRN的ResourceManager负责重新启动,最多重启次数可由用户设置,默认是2次,一旦超过最高重启次数,则作业失败。

  • Map Task/Reduce Task容错性

    • Task周期性向MRAppMaster汇报心跳,一旦Task挂掉,则MRAppMaster将为之重新申请资源并运行。最多重新运行次数可由用户设置,默认4次

MapReduce计算框架--数据本地性

分布式计算引擎MapReduce基础

  • 数据本地性(data locality)

    • 如果任务运行在它将处理的数据所在的节点,则称该任务具有“数据本地性”

    • 本地性可避免跨节点或机架数据传输,提高运行效率

  • 数据本地性分类

    • 同节点(node-local)

    • 同机架(rack-local)

    • 其他(off-swith)

MapReduce计算框架--推测执行机制

  • 作业完成时间取决于最慢的任务完成时间

    • 一个作业由若干各Map任务和Reduce任务构成

    • 因硬件老化、软件Bug等,某些任务可能运行非常慢

  • 推测执行机制

    • 发现拖后腿的任务,比如某个任务运行速度远慢于任务平均速度

    • 为拖后腿任务启动一个备份任务,同时运行

    • 谁先运行完,则采用谁的结果

  • 不能启用推测执行机制

    • 任务间存在严重的负载倾斜

    • 特殊任务,比如任务向数据库中写数据

  • mapreduce.map.speculative:是否打开map阶段的推测执行机制

  • mapreduce.reduce.speculative:是否打开map阶段的推测执行机制

MapReduce计算框架--任务并行执行

分布式计算引擎MapReduce基础

MapReduce编程接口

  • Hadoop提供了两种编程方式:

    • Java(最原始的方式)

    • Hadoop Streaming(支持多语言)

  • Java编程接口是所有编程方式的基础

  • 不同的编程接口只是暴露给用户的形式不同而已,内部执行引擎是一样的

  • 不同编程方式效率不同

  • Java编程接口组成

    • 旧API:所在java包:org.apache.hadoop.mapred

    • 新API:所在java包:org.apache.hadoop.mapreduce

  • 新API具有更好的扩展性

  • 两种编程接口只是暴露给用户的形式不同而已,内部执行引擎是一样的

WordCount问题

分布式计算引擎MapReduce基础


分布式计算引擎MapReduce基础

WordCount问题--mapper设计与实现

    public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}

WordCount问题--reduce设计与实现

    public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}

WordCount问题--main函数设计与实现

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
System.out.println("Uasge: wordcount <int> [<in>...] <out>");
System.exit(2);
}
Job job = new Job(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
for (int i = 0; i < otherArgs.length - 1; i++) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}

FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}

WordCount问题--程序运行

hadoop jar /home/bigdata/mapreduce/wordcount-0.1.0-SNAPSHOT.jar com.demo.WordCount /user/bigdata/wordcount_input /user/bigdata/output

WordCount问题--输入数据格式解析

  • 使用默认的TextInputFormat

    • 每个Map Task处理一个split

    • 一个split大小等于一个block

    • 如果最后一行数据被截断,则读取后一个block的前半部分

    • 转换成key/value对,key是偏移量,value是行内容

    分布式计算引擎MapReduce基础

MapReduce应用案例:倒排索引

  • 倒排索引(Inverted index):也常被称为反向索引,是一种索引方法,被用来存储在全文搜索下某个单词在一个文档或者一组文档中存储位置的映射。它是文档检索系统中最常用的数据结构

词汇 文档1 文档2 文档3 文档4 文档5
词汇1 2 1 1 2 1
词汇2 1 2 2 1 1
词汇3 1 1 1 2 1
词汇4 2 1 1 1 2
词汇5 1 2 1 1 1
词汇6 1 1 2 1 1
  • 原文档

T0 = "it is what itis"
T1 = "what is it"
T2 = "it is a banana"
  • 倒排索引

"a":        {2}
"banana": {2}
"is": {0, 1, 2}
"it": {0, 1, 2}
"what": {0, 1}
  • 应用:检索的条件"what","is"和"it",将对应这个集合:{0,1}∩{0,1,2}∩{0,1,2}={0,1}

  • 输入:一些文件

  • Map:读入文件内容,转换为单词->文件的映射

    分布式计算引擎MapReduce基础

  • Reduce:读入为单词->文件列表的映射,转为:单词->文件和对应单词出现次数的列表

    分布式计算引擎MapReduce基础

package com.elian;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.StringTokenizer;

public class InvertIndex {
public static class IndexMapper extends Mapper<Object, Text, Text, Text> {
public void map(Object key, Text value,Context context) throws IOException, InterruptedException {
// Get the name of the file using context.getInputSplit() method
String fileName = ((FileSplit) context.getInputSplit()).getPath().getName();
// Split the line in words
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
// For each word emit word as key and file name as value
context.write(new Text(itr.nextToken()), new Text(fileName));
}
}
}

public static class IndexReducer extends Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
// Declare the Hash Map to store File name as key to compute
// and store number of times the filename is occurred for as value
Map<String, Integer> map = new HashMap<>();
for (Text fileText : values) {
String file = fileText.toString();
if (map.containsKey(file)) {
map.put(file, map.get(file) + 1);
} else {
map.put(file, 1);
}
}
context.write(key, new Text(map.toString()));
}
}

public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
System.out.println("Usage: InvertIndex <int> [<in>...] <out>");
System.exit(2);
}
Job job = Job.getInstance(conf, "Invert Index");
job.setJarByClass(InvertIndex.class);
job.setMapperClass(InvertIndex.IndexMapper.class);
job.setCombinerClass(InvertIndex.IndexReducer.class);
job.setReducerClass(InvertIndex.IndexReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
for (int i = 0; i < otherArgs.length - 1; i++) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}

FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

MapReduce内部机制

  • 每一个Map Task把输出写到内存的buffer中

  • 当内存buffer到达一定阈值时,后台线程会把数据写到磁盘

    • 根据Partitioner,把数据写入到不同的partition

    • 对每个partition的数据进行排序

    • 运行Combiner

  • Reduce Task启动后,从每个Map Task上远程拷贝属于它的partition文件

  • 把这些文件归并排序

MapReduce参数调优

  • YARN Container申请资源

    • Dmapreduce.map.memory.mb=5120 map在YARN中申请的资源

    • Dmapreduce.reduce.memory.mb=5120 reduce在YARN中申请的资源

    • Dmapreduce.map.java.opts="Xms4096m -Xmx4096m" map在启动时,使用的资源

    • Dmapreduce.reduce.java.opts="Xms4096m -Xmx4096m" reduce在启动时,使用的资源

  • Task重试次数

    • mapreduce.map.maxattempts: map task重试次数,默认是4

    • mapreduce.reduce.maxattempts: reduce task重试次数,默认是4

    • mapreduce.map.failures.maxpercent: 最大允许的map失败率,默认是0

    • mapreduce.reduce.failures.maxpercent: 最大允许的reduce失败率,默认是0

  • 推测执行

    • mapreduce.map.speculative: 是否打开map阶段的推测执行机制

    • mapreduce.reduce.speculative: 是否打开reduce阶段的推测执行机制