vlambda博客
学习文章列表

「大数据」(七十四)Spark之应用案例编程

【导读:数据是二十一世纪的石油,蕴含巨大价值,这是·情报通·大数据技术系列第[74]篇文章,欢迎阅读和收藏】

1 搭建开发环境

1.1 安装 Scala IDE

搭建 Scala 语言开发环境很容易, Scala IDE 官网 下载合适的版本并解压就可以完成安装,下文示例中使用的版本是 4.1.0 。

1.2 安装 Scala 语言包

如果下载的 Scala IDE 自带的 Scala 语言包与 Spark 1.3.1 使用的 Scala 版本 (2.10.x) 不一致,那么就需要下载和本文所使用的 Spark 所匹配的版本,以确保实现的 Scala 程序不会因为版本问题而运行失败。请下载并安装 Scala 2.10.5 版本


1.3 安装 JDK

如果机器上没有安装 JDK ,请下载并安装 1.6 版本以上的 JDK 。

1.4 创建并配置 Spark 工程

打开 Scala IDE ,创建一个名称为 spark-exercise 的 Scala 工程。

(1) 创建 scala 工程,在工程目录下创建一个 lib 文件夹,并且把您的 Spark 安装包下的 spark-assembly jar 包拷贝到 lib 目录下。

(2) 添加 jar 包到 classpath

2 案例分析与编程实现

2.1 案例一

2.1.1 案例描述

提起 Word Count( 词频数统计 ) ,相信大家都不陌生,就是统计一个或者多个文件中单词出现的次数。本文将此作为一个入门级案例,由浅入深的开启使用 Scala 编写 Spark 大数据处理程序的大门。

2.1.2 案例分析

对于词频数统计,用 Spark 提供的算子来实现,我们首先需要将文本文件中的每一行转化成一个个的单词 , 其次是对每一个出现的单词进行记一次数,最后就是把所有相同单词的计数相加得到最终的结果。

对于第一步我们自然的想到使用 flatMap 算子把一行文本 split 成多个单词,然后对于第二步我们需要使用 map 算子把单个的单词转化成一个有计数的 Key-Value 对,即 word -> (word,1). 对于最后一步统计相同单词的出现次数,我们需要使用 reduceByKey 算子把相同单词的计数相加得到最终结果。

2.1.3 编程实现

SparkWordCount 类源码:

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
object SparkWordCount {
 def FILE_NAME:String = "word_count_results_";
 def main(args:Array[String]) {
 if (args.length < 1) {
 println("Usage:SparkWordCount FileName");
 System.exit(1);
 }
 val conf = new SparkConf().setAppName("Spark Exercise: Spark Version Word Count Program");
 val sc = new SparkContext(conf);
 val textFile = sc.textFile(args(0));
 val wordCounts = textFile.flatMap(line => line.split(" ")).map(
                                        word => (word, 1)).reduceByKey((a, b) => a + b)
 //print the results,for debug use.
 //println("Word Count program running results:");
 //wordCounts.collect().foreach(e => {
 //val (k,v) = e
 //println(k+"="+v)
 //});
 wordCounts.saveAsTextFile(FILE_NAME+System.currentTimeMillis());
 println("Word Count program running results are successfully saved.");
 }
}

2.1.4 提交执行

本实例中 , 我们将统计 HDFS 文件系统中 /user/fams 目录下所有 txt 文件中词频数。其中 spark-exercise.jar 是 Spark 工程打包后的 jar 包,这个 jar 包执行时会被上传到目标服务器的 /home/fams 目录下。运行此实例的具体命令如下:

./spark-submit \
--class com.ibm.spark.exercise.basic.SparkWordCount \
--master spark://hadoop036166:7077 \
--num-executors 3 \
--driver-memory 6g --executor-memory 2g \
--executor-cores 2 \
/home/fams/sparkexercise.jar \
hdfs://hadoop036166:9000/user/fams/*.txt

2.1.5 执行结果

该实例把最终的结果存储在了 HDFS 上,那么如果程序运行正常我们可以在 HDFS 上找到生成的文件信息

2.2 案例二

2.2.1 案例描述

该案例中,我们将假设我们需要统计一个 1000 万人口的所有人的平均年龄,当然如果您想测试 Spark 对于大数据的处理能力,您可以把人口数放的更大,比如 1 亿人口,当然这个取决于测试所用集群的存储容量。假设这些年龄信息都存储在一个文件里,并且该文件的格式为,第一列是 ID ,第二列是年龄。

现在我们需要用 Scala 写一个生成 1000 万人口年龄数据的文件,源程序如下:

import java.io.FileWriter
 import java.io.File
 import scala.util.Random
 object SampleDataFileGenerator {
  def main(args:Array[String]) {
 val writer = new FileWriter(new File("C: \\sample_age_data.txt"),false)
 val rand = new Random()
 for ( i <- 1 to 10000000) {
 writer.write( i + " " + rand.nextInt(100))
 writer.write(System.getProperty("line.separator"))
 }
 writer.flush()
 writer.close()
 }
 }

2.2.2 案例分析

要计算平均年龄,那么首先需要对源文件对应的 RDD 进行处理,也就是将它转化成一个只包含年龄信息的 RDD ,其次是计算元素个数即为总人数,然后是把所有年龄数加起来,最后平均年龄 = 总年龄 / 人数。

对于第一步我们需要使用 map 算子把源文件对应的 RDD 映射成一个新的只包含年龄数据的 RDD ,很显然需要对在 map 算子的传入函数中使用 split 方法,得到数组后只取第二个元素即为年龄信息;第二步计算数据元素总数需要对于第一步映射的结果 RDD 使用 count 算子;第三步则是使用 reduce 算子对只包含年龄信息的 RDD 的所有元素用加法求和;最后使用除法计算平均年龄即可。本例输出结果很简单,所以只打印在控制台即可。

2.2.3 编程实现

AvgAgeCalculator 类源码:

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object AvgAgeCalculator {
 def main(args:Array[String]) {
 if (args.length < 1){
 println("Usage:AvgAgeCalculator datafile")
 System.exit(1)
 }
 val conf = new SparkConf().setAppName("Spark Exercise:Average Age Calculator")
 val sc = new SparkContext(conf)
 val dataFile = sc.textFile(args(0), 5);
 val count = dataFile.count()
 val ageData = dataFile.map(line => line.split(" ")(1))
 val totalAge = ageData.map(age => Integer.parseInt(
                                String.valueOf(age))).collect().reduce((a,b) => a+b)
 println("Total Age:" + totalAge + ";Number of People:" + count )
 val avgAge : Double = totalAge.toDouble / count.toDouble
 println("Average Age is " + avgAge)
 }
}

2.2.4 提交执行

要执行本实例的程序,需要将刚刚生成的年龄信息文件上传到 HDFS 上,假设您刚才已经在目标机器上执行生成年龄信息文件的 Scala 类,并且文件被生成到了 /home/fams 目录下。那么需要运行一下 HDFS 命令把文件拷贝到 HDFS 的 /user/fams 目录。

年龄信息文件拷贝到 HDFS 目录的命令:

hdfs dfs –copyFromLocal /home/fams /user/fams

AvgAgeCalculator 类的执行命令:


./spark-submit \
 --class com.ibm.spark.exercise.basic.AvgAgeCalculator \
 --master spark://hadoop036166:7077 \
 --num-executors 3 \
 --driver-memory 6g \
 --executor-memory 2g \
 --executor-cores 2 \
 /home/fams/sparkexercise.jar \
 hdfs://hadoop036166:9000/user/fams/inputfiles/sample_age_data.txt