mport java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class WordCount extends Configured implements Tool{
/*
* 对文本文件进行Wordcount, 文本文件的输入类型是 TextInputFormat ,它实现了createRecordReader,
* 返回创建的LineRecordReader 实现类,这个类里就有对应的key和value的类型
*
* 文本文件
* KEYIN:行字节偏移量
* VALUEIN:一行数据
*
* mapper的输入类型是由业务需求来自行确定类型,跟框架没关系,因为我们的需求是按照单词统计数量
*
* key:单词,String 类型的封装类 Text
* value:数值,Long类型的封装类LongWritable
*
*/
public static class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
ZZ
// map(), 一行调用一次
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
System.out.println("map(): keyIn:" + key.get() + "; valueIn:" + value.toString());
String[] splits = line.split(" ");
for(String word : splits){
keyOut.set(word);
//map()输出数据,用 context.write()
context.write(keyOut, valueOut);
System.out.println("map(): keyOut:" + keyOut.toString() + "; valueOut:" + valueOut.get());
}
}
}
/*
* KEYIN, VALUEIN: 根据map输出的类型来确定
* KEYOUT, VALUEOUT:根据业务需求确定
* KEYOUT 是单词,String 类型的封装类 Text
* VALUEOUT 数值,Long类型的封装类LongWritable
*
*/
public static class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
LongWritable valueOut = new LongWritable();
//一个key调用一次
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
StringBuilder sb = new StringBuilder();
sb.append("reduce(): keyIn:" + key.toString() + "; vlaueIn:[");
long sum = 0;
for(LongWritable w : values){
//通过get(),获取LongWritable 对象的实际值
long num = w.get();
sum += num;
sb.append(num).append(",");
}
sb.deleteCharAt(sb.length() - 1);
sb.append("]");
System.out.println(sb.toString());
valueOut.set(sum);
context.write(key, valueOut);
}
}
@Override
public int run(String[] args) throws Exception {
// job 创建及配置, 提交任务
Configuration conf = getConf();
// 创建job对象
Job job = Job.getInstance(conf, "wordcount");
//job 任务运行类
job.setJarByClass(WordCount.class);
//job 任务 map运行类
job.setMapperClass(WordCountMapper.class);
//job 任务reduce 运行类
job.setReducerClass(WordCountReducer.class);
//job 任务map阶段输出的key的类型
job.setMapOutputKeyClass(Text.class);
//job 任务map阶段输出的value类型
job.setMapOutputValueClass(LongWritable.class);
//job 任务reduce阶段(最后阶段)输出的key的类型
job.setOutputKeyClass(Text.class);
//job 任务reduce阶段(最后阶段)输出的value的类型
job.setOutputValueClass(LongWritable.class);
//设置reduce个数
job.setNumReduceTasks(2);
//job 任务的输入目录
FileInputFormat.addInputPath(job, new Path(args[0]));
Path outputPath = new Path(args[1]);
//job 任务的输出目录
FileOutputFormat.setOutputPath(job, outputPath);
//解决自动删除输出目录
FileSystem fs = FileSystem.get(conf);
//判断文件系统下存不存在该目录,如果存在删除
if(fs.exists(outputPath)){
//递归删除
fs.delete(outputPath, true);
System.out.println("output dir: " + outputPath.toString() + " deleted SUCCESS!");
}
//提交任务
//waitForCompletion(false); false:代表不输出counter
boolean status = job.waitForCompletion(false);
return status ? 0 : 1;
}
public static void main(String[] args) throws Exception {
//运行时 将输入输出目录放到执行参数里,用main()的args 接收到
// /tmp/input /tmp/output
System.exit(ToolRunner.run(new WordCount(), args));
}
}