vlambda博客
学习文章列表

0009 - 基于MapReduce的应用案例


01
  MapReduce词频统计案例



1.1 - 样本数据

这是一个经典的词频统计的案例:统计如下样本数据中每个单词出现的次数。

[root@hadoop-01 ~]# cat input.txt
Spark HBase Azkaban Flume
Hive Flink Storm Hadoop HBase Spark
Flink Presto Kudu Azkaban
HBase Storm Presto Kafka
HBase Hadoop Hive Flink Kudu
HBase Flink Hive Storm
Hive Flink Hadoop Flume
HBase Hive Kudu Zookeeper
Hadoop Spark HBase Storm
HBase Hadoop Hive Flink
HBase Flink Hive Storm Kudu
Hive Flink Hadoop Kafka
HBase Hive Presto Zookeeper
Presto Kudu Hadoop Kafka
Zookeeper Hadoop Flume
Azkaban Kudu Presto
Kafka Zookeeper
Kafka Flume

[root@hadoop-01 ~]# hdfs dfs -mkdir -p /tmp/wordcount/input
[root@hadoop-01 ~]# hdfs dfs -put -f input.txt /tmp/wordcount/input/input.txt
(可左右滑动)



WordCountDemo
https://github.com/Jerome-LJ/DreamWorks/tree/main/WordCountDemo


1.2 - 项目依赖

进行 MapReduce 编程,需要导入 hadoop-client 的依赖版本:

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>${hadoop.version}</version>
</dependency>


1.3 - WordCountMapper

input.txt 文件的每行数据按照指定分隔符进行拆分。这里需要注意在 MapReduce 中必须使用 Hadoop 定义的类型,因为 Hadoop 预定义的类型都是可序列化,可比较的,所有类型均实现了 WritableComparable 接口。


在 Map 中将每行数据按照分隔符进行拆分:

/**
 * Object      : Mapping 输入文件的内容
 * Text        : Mapping 输入的每一行的数据
 * Text        : Mapping 输出 key 的类型
 * IntWritable : Mapping 输出 value 的类型
 */

public class WordCountMapper extends Mapper<ObjectTextTextIntWritable{

    @Override
    protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
        // 获取到一行文件的内容
        String line = value.toString();
        // 按照空格切分这一行的内容为一个单词数组
        String[] words = StringUtils.split(line, " ");
        // 遍历输出 <key, value> 键值对
        for (String word : words) {
            context.write(new Text(word), new IntWritable(1));
        }
    }
}
(可左右滑动)


WordCountMapper 对应下图的 Mapping 操作:

0009 - 基于MapReduce的应用案例


1.4 - WordCountReducer

在 Reduce 中进行单词出现次数的统计:

/**
 * Text         : Mapping 输入的 key 的类型
 * IntWritable  : Mapping 输入的 value 的类型
 * Text         : Reducing 输出的 key 的类型
 * IntWritable  : Reducing 输出的 value 的类型
 */


public class WordCountReducer extends Reducer<TextIntWritableTextIntWritable{

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        // 统计数字
        int count = 0;
        // 累加求和
        for (IntWritable value : values) {
            count += value.get();
        }
        // 输出 <单词:count> 键值对
        context.write(key, new IntWritable(count));
    }
}
(可左右滑动)


WordCountReducer 对应下图的 Shuffling 和 Reducing 操作(Shuffling 的输出是 Reducing 的输入):

0009 - 基于MapReduce的应用案例


1.5 - WordCountApp

打包 jar 到集群使用 hadoop 命令提交作业示例:

public class WordCountApp {

    // 1、使用硬编码,显示参数,实际开发中可以通过外部传参
    private static final String HDFS_URL = "hdfs://172.16.1.2:8020";
    private static final String HADOOP_USER_NAME = "hdfs";

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {

        // 文件的输入路径和输出路径由外部传参指定
        if (args.length < 2) {
            System.err.println("Usage: wordcount <in> [<in>...] <out>");
            System.exit(2);
        }

        // 指定 Hadoop 用户名,否则在 HDFS 上创建目录时可能会抛出权限不足的异常
        System.setProperty("HADOOP_USER_NAME", HADOOP_USER_NAME);

        Configuration configuration = new Configuration();
        // 指定 HDFS 的地址
        configuration.set("fs.defaultFS", HDFS_URL);

        // 2、获取 job 对象
        Job job = Job.getInstance(configuration);

        // 3、设置 jar 存储位置
        job.setJarByClass(WordCountApp.class);

        // 4、关联 Mapper 和 Reducer
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);

        // 5、设置 Mapper 阶段输出数据的 key 和 value 类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        // 6、设置 Reducer 阶段输出数据的 key 和 value 类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 7、如果输出目录已经存在,则必须先删除,否则重复运行程序时会抛出异常
        FileSystem fileSystem = FileSystem.get(new URI(HDFS_URL), configuration, HADOOP_USER_NAME);
        Path outputPath = new Path(args[1]);
        if (fileSystem.exists(outputPath)) {
            fileSystem.delete(outputPath, true);
        }

        // 8、设置输入文件和输出文件的路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, outputPath);

        // 9、提交 job 到群集并等待它完成,参数设置为 true 代表打印显示对应的进度
        boolean result = job.waitForCompletion(true);

        // 10、退出程序
        System.exit(result ? 0 : 1);
    }
}
(可左右滑动)


提示:如果不设置 Mapper 操作的输出类型,则默认和 Reducer 操作输出的类型相同。


1.6 - 提交到 MapReduce 集群运行

可以在本机配置 Hadoop 开发环境,直接在 IDE 中启动进行测试。本案例打包提交到服务器中运行。由于本项目没有使用除 Hadoop 外的第三方依赖,因此直接打包即可:


1、打包 jar 文件

0009 - 基于MapReduce的应用案例


或者使用 mvn 命令打包:

[root@hadoop-01 ~]# mvn package -DskipTests


2、提交作业

[root@hadoop-01 ~]# hadoop jar WordCount-1.0.jar com.jerome.wordcount.WordCountApp /tmp/wordcount/input /tmp/wordcount/output
(可左右滑动)


3、作业完成后,在 YARN 的 Web 界面查看 Applications 运行情况

0009 - 基于MapReduce的应用案例


4、查看 HDFS 上统计结果

# 查看生成目录
[root@hadoop-01 ~]# hdfs dfs -ls /tmp/wordcount/output

# 查看统计结果
[root@hadoop-01 ~]# hdfs dfs -cat /tmp/wordcount/output/part-r-00000
(可左右滑动)


0009 - 基于MapReduce的应用案例


提示:每个作业的 Reduce 任务的默认个数为 1(通过 mapreduce.job.reduces 设置)。


每个 Reduce 任务都会产生一个输出文件,如果设置 Reduce 个数为 3,那么 Map 输出数据会被分成 3 份,Reduce 输出的 part-r-00000 文件也会有 3 个。


02
  计词频统计案例之Combiner



2.1 - 代码实现

如果需要使用 Combiner 功能,只要在打包 jar 文件时,添加下面一行代码即可:

// 5、设置Combiner
job.setCombinerClass(WordCountReducer.class);


2.2 - 执行结果
[root@hadoop-01 ~]# hadoop jar WordCount-1.0.jar com.jerome.wordcount.WordCountCombinerApp /tmp/wordcount/input /tmp/wordcount/output
(可左右滑动)


加入 Combiner 功能后,统计结果不会发生变化,但是可以从打印的日志看出 Combiner 的效果:


没有加入 Combiner 功能的打印日志如下:

0009 - 基于MapReduce的应用案例


加入 Combiner 功能后的打印日志如下:

0009 - 基于MapReduce的应用案例


本案例只有一个输入文件并且小于 128M,所以只有一个 Map 进行处理。可以看到经过 Combiner 功能后,Combine input records 值由 70 降低为 12样本中单词种类就只有 12 种),在本案例中 Combiner 能降低需要传输的数据量。


03
  词频统计案例之Partitioner



3.1 - 默认的 Partitioner

假设有个需求:将不同单词的统计结果输出到不同文件。这种需求实际上比较常见,比如统计产品的销量时,需要将结果按照产品种类进行拆分。要实现这个功能,就需要用到自定义 Partitioner


MapReduce 默认的分类规则:在构建 job 的时候,如果不指定,则默认使用的是 HashPartitioner:对 key 值进行哈希散列,并对 numReduceTasks 取余。其实现如下:

public class HashPartitioner<KVextends Partitioner<KV{

    public int getPartition(K key, V value, int numReduceTasks) {
        return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
    }
}
(可左右滑动)


3.2 - 自定义 Partitioner

使用 Partitioner 自定义分类规则,按照单词进行分类:

public class CustomPartitioner extends Partitioner<TextIntWritable{

    @Override
    public int getPartition(Text key, IntWritable value, int numPartitions) {
        if (key.toString().equals("Azkaban")) {
            return 0;
        }else if (key.toString().equals("Flink")) {
            return 1;
        }else if (key.toString().equals("Flume")) {
            return 2;
        }else if (key.toString().equals("HBase")) {
            return 3;
        }else if (key.toString().equals("Hadoop")) {
            return 4;
        }else if (key.toString().equals("Hive")) {
            return 5;
        }else if (key.toString().equals("Kafka")) {
            return 6;
        }else if (key.toString().equals("Kudu")) {
            return 7;
        }else if (key.toString().equals("Presto")) {
            return 8;
        }else if (key.toString().equals("Spark")) {
            return 9;
        }else if (key.toString().equals("Storm")) {
            return 10;
        }else {
            return 11;
        }
    }
}
(可左右滑动)


在构建 job 时,指定使用自定义的分类规则,并设置 reduce 的个数:

// 6、设置自定义 Partitioner 规则
job.setPartitionerClass(CustomPartitioner.class);

// 7、设置 Reduce 个数
job.setNumReduceTasks(12);


3.3 - 执行结果
[root@hadoop-01 ~]# hadoop jar WordCount-1.0.jar com.jerome.wordcount.WordCountCombinerPartitionerApp /tmp/wordcount/input /tmp/wordcount/output
(可左右滑动)


执行结果如下,分别生成 12 个文件,每个文件中为对应单词的统计结果:

[root@hadoop-01 ~]# hdfs dfs -ls /tmp/wordcount/output/
Found 13 items
-rw-r--r--   3 hdfs supergroup          0 2021-09-03 15:30 /tmp/wordcount/output/_SUCCESS
-rw-r--r--   3 hdfs supergroup         10 2021-09-03 15:30 /tmp/wordcount/output/part-r-00000
-rw-r--r--   3 hdfs supergroup          8 2021-09-03 15:30 /tmp/wordcount/output/part-r-00001
-rw-r--r--   3 hdfs supergroup          8 2021-09-03 15:30 /tmp/wordcount/output/part-r-00002
-rw-r--r--   3 hdfs supergroup          9 2021-09-03 15:30 /tmp/wordcount/output/part-r-00003
-rw-r--r--   3 hdfs supergroup          9 2021-09-03 15:30 /tmp/wordcount/output/part-r-00004
-rw-r--r--   3 hdfs supergroup          7 2021-09-03 15:30 /tmp/wordcount/output/part-r-00005
-rw-r--r--   3 hdfs supergroup          8 2021-09-03 15:30 /tmp/wordcount/output/part-r-00006
-rw-r--r--   3 hdfs supergroup          7 2021-09-03 15:30 /tmp/wordcount/output/part-r-00007
-rw-r--r--   3 hdfs supergroup          9 2021-09-03 15:30 /tmp/wordcount/output/part-r-00008
-rw-r--r--   3 hdfs supergroup          8 2021-09-03 15:30 /tmp/wordcount/output/part-r-00009
-rw-r--r--   3 hdfs supergroup          8 2021-09-03 15:30 /tmp/wordcount/output/part-r-00010
-rw-r--r--   3 hdfs supergroup         12 2021-09-03 15:30 /tmp/wordcount/output/part-r-00011

[root@hadoop-01 ~]# hdfs dfs -cat /tmp/wordcount/output/part-r-00000
Azkaban    3
[root@hadoop-01 ~]# hdfs dfs -cat /tmp/wordcount/output/part-r-00001
Flink    8
[root@hadoop-01 ~]# hdfs dfs -cat /tmp/wordcount/output/part-r-00002
Flume    4
(可左右滑动)


0009 - 基于MapReduce的应用案例




扫一扫,我们的故事就开始了。


0009 - 基于MapReduce的应用案例

扫描二维码获取

更多精彩

大数据梦工厂