vlambda博客
学习文章列表

大数据技术,mapreduce版的wordcount,对文本单词进行统计

mport java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class WordCount extends Configured implements Tool{
    /*
     * 对文本文件进行Wordcount, 文本文件的输入类型是 TextInputFormat ,它实现了createRecordReader,
     * 返回创建的LineRecordReader 实现类,这个类里就有对应的key和value的类型
     * 
     * 文本文件
     * KEYIN:行字节偏移量
     * VALUEIN:一行数据
     * 
     * mapper的输入类型是由业务需求来自行确定类型,跟框架没关系,因为我们的需求是按照单词统计数量
     * 
     * key:单词,String 类型的封装类 Text
     * value:数值,Long类型的封装类LongWritable
     * 
     */


    public static class WordCountMapper extends Mapper<LongWritableTextTextLongWritable>{

      ZZ
        // map(), 一行调用一次
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException 
{
            String line = value.toString();
            System.out.println("map(): keyIn:" + key.get() + "; valueIn:" + value.toString());

            String[] splits = line.split(" ");
            for(String word : splits){
                keyOut.set(word);
                //map()输出数据,用 context.write()
                context.write(keyOut, valueOut);
                System.out.println("map(): keyOut:" + keyOut.toString() + "; valueOut:" + valueOut.get());
            }

        }

    }

    /*
     * KEYIN, VALUEIN: 根据map输出的类型来确定
     * KEYOUT, VALUEOUT:根据业务需求确定
     * KEYOUT 是单词,String 类型的封装类 Text
     * VALUEOUT 数值,Long类型的封装类LongWritable
     * 
     */

    public static class WordCountReducer extends Reducer<TextLongWritableTextLongWritable>{
        LongWritable valueOut = new LongWritable();
        //一个key调用一次
        @Override
        protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {

            StringBuilder sb = new StringBuilder();
            sb.append("reduce(): keyIn:" + key.toString() + "; vlaueIn:[");
            long sum = 0;
            for(LongWritable w : values){
                //通过get(),获取LongWritable 对象的实际值
                long num = w.get();
                sum += num;
                sb.append(num).append(",");
            }
            sb.deleteCharAt(sb.length() - 1);
            sb.append("]");
            System.out.println(sb.toString());

            valueOut.set(sum);
            context.write(key, valueOut);


        }
    }


    @Override
    public int run(String[] args) throws Exception {
//        job 创建及配置, 提交任务
        Configuration conf = getConf();
//        创建job对象
        Job job = Job.getInstance(conf, "wordcount");

        //job 任务运行类
        job.setJarByClass(WordCount.class);

        //job 任务 map运行类
        job.setMapperClass(WordCountMapper.class);
        //job 任务reduce 运行类
        job.setReducerClass(WordCountReducer.class);
        //job 任务map阶段输出的key的类型
        job.setMapOutputKeyClass(Text.class);
        //job 任务map阶段输出的value类型
        job.setMapOutputValueClass(LongWritable.class);
        //job 任务reduce阶段(最后阶段)输出的key的类型
        job.setOutputKeyClass(Text.class);
        //job 任务reduce阶段(最后阶段)输出的value的类型
        job.setOutputValueClass(LongWritable.class);

        //设置reduce个数
        job.setNumReduceTasks(2);

        //job 任务的输入目录
        FileInputFormat.addInputPath(job, new Path(args[0]));

        Path outputPath = new Path(args[1]);
        //job 任务的输出目录
        FileOutputFormat.setOutputPath(job, outputPath);

        //解决自动删除输出目录
        FileSystem fs = FileSystem.get(conf);
        //判断文件系统下存不存在该目录,如果存在删除
        if(fs.exists(outputPath)){
            //递归删除
            fs.delete(outputPath, true);
            System.out.println("output dir: " + outputPath.toString() + " deleted SUCCESS!");
        }

        //提交任务
        //waitForCompletion(false); false:代表不输出counter
        boolean status = job.waitForCompletion(false);

        return status ? 0 : 1;

    }

    public static void main(String[] args) throws Exception {
        //运行时 将输入输出目录放到执行参数里,用main()的args 接收到
        //  /tmp/input /tmp/output
        System.exit(ToolRunner.run(new WordCount(), args));

    }

}