一文学会MapReduce编程
“ MapReduce编程模型,相对于初学者来说,会有一些门槛,没关系,这一篇让你学会使用MapReduce进行分布式处理。”
01
—
MapReduce编程模型
<key, value> 形式的键值对进行处理。MapReduce会将任务的输入当成一组 <key, value> 键值对,最后也会生成一组 <key, value> 键值对作为结果。
/** KEYIN:输入kv数据对中key的数据类型* VALUEIN:输入kv数据对中value的数据类型* KEYOUT:输出kv数据对中key的数据类型* VALUEOUT:输出kv数据对中value的数据类型* 数据类型为Writable类型*/public static class MyMapper extends Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>{// Context为MapReduce上下文,在Map中通常用于将数据处理结果输出public void map(KEYIN key, VALUEIN value, Context context) throws IOException, InterruptedException {// Map功能的实现}}public static class MyReducer extends Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {// 这里reduce方法的输入的Value值是可迭代Iterable类型,因为Reduce阶段会将Key值相同的数据放置在一起public void reduce(KEYIN key, Iterable<VALUEIN> values, Context context ) throws IOException, InterruptedException {// Reduce功能的实现}}
public static void main(String[] args) throws Exception {// 配置类Configuration conf = new Configuration();// 创建MapReduce Job实例Job job = Job.getInstance(conf, "Job Name");// 为MapReduce作业设置必要的配置// 设置main函数所在的入口类job.setJarByClass(WordCount.class);// 设置Map和Reduce实现类,并指定Combinerjob.setMapperClass(MyMapper.class);job.setCombinerClass(MyReducer.class);job.setReducerClass(IntSumReducer.class);// 设置结果数据的输出类job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);// 设置结果数据的输入和输出路径FileInputFormat.addInputPath(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));// 作业运行,并输出结束标志System.exit(job.waitForCompletion(true) ? 0 : 1);}
job.setNumReduceTasks(int)
02
—
WordCount实现
首先准备数据,并上传到HDFS中:
将Hadoop当前用户切换为hdfs,进行访问授权export HADOOP_USER_NAME=hdfs在HDFS中创建作业输入目录hadoop fs -mkdir -p /tmp/mr/data/wc_input在本地创建词频统计文件vim wordcount.txt文本内容如下hello hadoophello hdfshello yarnhello mapreduce将wordcount.txt上传到作业输入目录hadoop fs -put wordcount.txt /tmp/mr/data/wc_input
import java.io.IOException;import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class WordCount {/** 实现Mapper,文件的每一行数据会执行一次map运算逻辑* 因为输入是文件,会将处理数据的行数作为Key,这里应为LongWritable,设置为Object也可以;Value类型为Text:每一行的文件内容* Mapper处理逻辑是将文件中的每一行切分为单词后,将单词作为Key,而Value则设置为1,<Word,1>* 因此输出类型为Text,IntWritable*/public static class TokenizerMapperextends Mapper<Object, Text, Text, IntWritable>{// 事先定义好Value的值,它是IntWritable,值为1private final static IntWritable one = new IntWritable(1);// 事先定义好Text对象word,用于存储提取出来的每个单词private Text word = new Text();public void map(Object key, Text value, Context context) throws IOException, InterruptedException {// 将文件内容的每一行数据按照空格拆分为单词StringTokenizer itr = new StringTokenizer(value.toString());// 遍历单词,处理为<word,1>的Key-Value形式,并输出(这里会调用上下文输出到buffer缓冲区)while (itr.hasMoreTokens()) {word.set(itr.nextToken());context.write(word, one);}}}/** 实现Reducer* 接收Mapper的输出,所以Key类型为Text,Value类型为IntWritable* Reducer的运算逻辑是Key相同的单词,对Value进行累加* 因此输出类型为Text,IntWritable,只不过IntWritable不再是1,而是最终累加结果*/public static class IntSumReducerextends Reducer<Text,IntWritable,Text,IntWritable> {// 预先定义IntWritable对象result用于存储词频结果private IntWritable result = new IntWritable();public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {int sum = 0;// 遍历key相同单词的value值,进行累加for (IntWritable val : values) {sum += val.get();}result.set(sum);// 将结果输出context.write(key, result);}}// 实现Main方法public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf, "word count");job.setJarByClass(WordCount.class);job.setMapperClass(TokenizerMapper.class);job.setCombinerClass(IntSumReducer.class);job.setReducerClass(IntSumReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);FileInputFormat.addInputPath(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);}}
export HADOOP_CLASSPATH=${JAVA_HOME}/lib/tools.jarhadoop com.sun.tools.javac.Main WordCount.javajar cf wc.jar WordCount*.class
hadoop jar wc.jar WordCount /tmp/mr/data/wc_input /tmp/mr/data/wc_output
hadoop fs -cat /tmp/mr/data/wc_output/part-r-*
扫描二维码
获取更多精彩
数舟,再会
