vlambda博客
学习文章列表

手把手玩转大数据--spark的RDD以及代码实操


  在开始学习Spark工作原理之前, 先来介绍一下Spark中两个最为重要的概念-- 弹性分布式数据集(Resilient Distributed Datasets, RDD) 和算子(Operation).


RDD背景

  Spark的核心是建立在RDD之上, 使Spark中的各个组件可以无缝进行集成, 从而在一个应用程序中完成大数据计算. 这也是为什么说在SparkCore中一切得计算都是基于RDD来完成的. RDD的设计理念源自AMP实验室发表的论文–Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing.


  MapReduce计算框架在实际应用中, 许多迭代式算法和交互式数据挖掘过程中的计算结果会写到磁盘, 然后再重复使用, 这就带来了大量的磁盘IO和序列化开销. 为解决中间过程数据落地花费大量时间的需求, 出现了一种抽象的数据结构, 让我们不必再考虑数据的分布式特性, 只需保存具体的逻辑转换表达式即可, 这种数据结构就是RDD.


  RDD之间的转换操作使父子RDD之间具有依赖关系, 满足条件的RDD之间形成管道(Pipeline), 从而避免中间结果落地, 极大的降低了磁盘IO和序列化消耗的时间.


RDD介绍

  RDD(弹性分布式数据集), 虽然叫做数据集, 但RDD并不像集合一样存储真实的数据, 而是存储这些数据转换的逻辑, 可以将RDD理解为一个大的数据集合以分布式的形式保存在集群服务器的内存中. 每个RDD可以分成多个分区, 每个分区就是一个数据集片段, 并且一个RDD的不同分区可以被保存到集群中不同的节点上(但是同一个分区不能被拆分保存), 从而可以在集群中的不同节点上进行并行计算.


  RDD提供了一种高度受限的共享内存模型, 即RDD是只读的记录分区的集合, 不能直接修改, 只能基于稳定的物理存储中的数据集来创建RDD, 或者通过在其他RDD上执行转换操作(如map、join和groupBy) 创建得到新的RDD.


Operation介绍

  算子(Operation)是Spark中定义的函数, 用于对RDD中的数据结构进行操作和转换等. Spark中的算子可以分为4类:


创建类(creation)算子, 用于将内存中的集合或外部文件创建为RDD.

转换(transformation)算子, 用于将一种格式的RDD转换为其他自定义格式.

缓存(cache)算子, 用于将RDD缓存在内存(memory)或磁盘(disk)中, 一般后续计算会用到重复数据时才会使用.

行动(action)算子, 用于触发执行Spark作业, 并将计算结果保存为集合, 标量或保存到外部文件, 数据库中.

  典型的RDD执行过程如下:


读入外部数据源(或者内存中的集合) ,然后Create RDD;

RDD经过一系列Transformation, 每一次都会产生不同的RDD, 供给下一个Transformation 使用;

最后一个RDD经Action进行处理, 得到最后想要的值, 并进行后续输出操作.

  需注意: RDD采用惰性调用, 即在RDD的执行过程中, 如图所示, 真正的计算发生在RDD的Action操作, 对于Action之前的所有Transformation操作, Spark只是记录下Transformation操作应用的一些基础数据集以及RDD生成的轨迹, 即相互之间的依赖关系, 而不会触发真正的计算.


  RDD提供的转换接口都非常简单, 都是类似map, filter, groupBy, join等粗粒度的数据转换操作, 而不是针对某个数据项的细粒度修改. 因此, RDD比较适合对于数据集中元素执行相同操作的批处理式应用, 而不适合用于需要异步/细粒度状态的应用, 比如Web应用系统, 增量式的网页爬虫等.


  转换和行动两种类型的算子, 前者指定RDD之间的相互依赖关系, 后者用于执行计算并指定输出的形式. 两类操作的主要区别是, 转换操作接受RDD并返回RDD, 而行动操作(如count、collect等) 接受RDD但是返回非RDD(即输出一个值或结果).


RDD五大特性

RDD是由一系列的Partition(分区)组成;

每一个函数作用在每一个分区上;

RDD之间存在依赖关系;

[可选项]分区器作用在KV格式的RDD上;

[可选项]RDD会提供最佳计算位置.

  接下来, 结合Spark实现的WC案例, 来理解这五个特性以及其他注意点(图中绿色为block块, 蓝色为Partition分区):

手把手玩转大数据--spark的RDD以及代码实操


HDFS存储文件是以block块的形式, Spark应用在读取HDFS上的数据后, 会将同一个block块中的数据转换逻辑保存在同一个Partition中, 一个文件对应的所有Partition构成一个RDD. 即一个RDD中的Partition个数等于这个文件存储在HDFS中的block个数. 但有一个例外, 如果一个block块的最后存储了某个数据的大部分字节后达到block规定的大小, 仅有少量字节存储在另外一个block块中, 这时这多余的小部分数据会放在与大部分数据相同的Partition中, 即Partition数小于block块数.

Spark中没有读文件的方法, 但Spark依然能够读取文件内容依赖的是MapReduce中读文件的方法. MR读文件前, 会先将文件划分为一个个的split(切片), 一个split的大小 = 一个block的大小; 但这个文件的split个数 ≈ 存储这个文件的block个数(同上一个例外情况); 一个RDD中Partition的个数 = 这个文件切分的split个数.

每一个函数作用在每一个分区上, 即每个函数会在每一个Partition中各执行一次.

RDD之间存在依赖关系, 通过一个算子关联的两个RDD称为父子RDD, 父子RDD之间存在宽窄依赖(后续讲解), 子RDD知道它的父RDD是谁, 但父RDD不知道它的子RDD有谁. 这种依赖关系的优势在于当数据因某种情形丢失时, 可以通过算子和父RDD重写计算出子RDD, 从而提高了计算的容错性. (RDD的依赖关系也被称为RDD的血统–Lineage)

KV格式的RDD指RDD中的数据是二元组类型, 对于这类RDD可以使用分区器按照Key或者Value进行分组, 进而完成聚合计算. 在WC中, pairRDD和restRDD均为KV格式的RDD. 分区器用于决定数据被放到哪一个reduce task中处理.

每一个算子作用在每一个Partition上, Partition会分布式的存储在集群各个节点的内存中, 对一个Partition的连续处理可以看作是一个task任务, 每一个task计算任务都在数据所在节点上执行, 从而实现数据本地化, 减少网络IO. 简单来说, RDD会提供一个方法接口, 调用这个接口就能直接拿到这个RDD所有Partition的位置, 拿到位置之后就可以分发task了. 至于这个接口是什么不需要我们关心, Spark应用在执行时会自动寻找.




实际操作:


案例说明

  大数据分析处理万变不离其宗, 核心思想就是一个WorldCount–单词统计. 单词统计, 顾名思义就是将一个文件中出现的所有单词读一遍, 并对相同单词的个数进行统计. 如何处理这个文件? 如何得到每一个单词? 如何对相同的单词进行统计? 这三个问题是需要解决的核心问题, 接下来就一起来看看是如何对一个文件进行WordCount的.


  首先, 来看一下我们测试的数据, 在这匹数据中, 同一行中每个单词之间使用制表符’\t’ 来分隔, 接下来我们先对这批数据的计算思想进行解析, 然后再分别使用MapReduce和Spark技术的API编码实现.


手把手玩转大数据--spark的RDD以及代码实操


  通过对这两种技术编码的比较, 可以帮助大家更好的理解之前所说的Spark在表达能力上相较于Hadoop(MR)的优势 Spark优势链接. 除此之外, 更重要的一点是引入SparkCore中弹性分布式数据集(RDD) 的概念, 对RDD有一定认识之后, 将有利于学习RDD的具体原理以及如何使用等知识.


  在Spark中, 一切计算都是基于RDD实现的, RDD可以看作是一个集合, 类似于Scala中的List, Map, 它有着与这些普通集合相同的方法(map, flatmap, foreach…), 但是RDD是重新写的这些方法, 初次之外还有许多其他的方法, 这些方法在Spark中称为算子, 之后的博客中会对它们进行详细介绍.


计算分析

无论是MapReduce还是Spark, 在读取数据时都是一行一行读取的而且读取的数据都是字符串类型, 因此在处理时要把一行数据看成一条记录;

既然一行是一条记录, 那么我们在处理时只需要关注这一条记录即可, 其余记录格式与之相同, 不相同格式的数据一般为脏数据, 需要过滤掉. 相同格式的按照规律进行切分(split).

数据切分完成后, 就可以得到每一个单词, 然后将每一个单词当作key, 把它的value置为1, 得到一些列KV格式的数据, 这些数据中有的key相同, 有的key不同, 但value都是1.

对这一系列KV格式的数据进行统计, 先按照Key进行分组, 相同Key, 即同一个单词为一组, 这个Key对应多个Value, 构成一个有一个或多个元素1组成的集合. 然后再将同一个Key中所有的Value进行累加, 累加完成之后将累加值最为新的Value, Key还是原来的Key.

最新的KV格式的数据中, Key代表的是出现的每一个单词, Value则对应该单词出现的次数.

  图解:

手把手玩转大数据--spark的RDD以及代码实操

————————————————

代码实现

在SparkCore中一切得计算都是基于RDD(弹性分布式数据集), R(Resilient) D(Distributed ) D(Dataset). RDD 调用的方法称为算子,一般情况下RDD的算子返回的还是RDD. 先对RDD有个大概的了解, 之后再对其进行详细地介绍.


  准备环境:


Scala运行环境

导入jar包, 开发Spark应用程序时, 只需要导入一个整合包即可.

  用Spark写WC:


package com.hpe.spark.core

import org.apache.spark.SparkConfimport org.apache.spark.SparkContext

object WCSpark { def main(args: Array[String]): Unit = {

//创建配置对象 val conf = new SparkConf() //设置App的名称-->方便在监控页面找到 conf.setAppName("WCSpark") //设置Spark的运行模式-->local本地运行-->用于测试环境 conf.setMaster("local") //创建Spark上下文 他是通往集群的唯一通道 val sc = new SparkContext(conf)

// textFile()读取上述数据,读取时是一行行读取,可以是本地也可是HDFS的数据,返回RDD类型的数据 val lineRDD = sc.textFile("d:/wc.txt") // 基于lineRDD中的数据按照\t进行分词 val wordRDD = lineRDD.flatMap { _.split("\t") } // 将wordRDD中的每一条数据封装成一个二元组,每一个单词计数为1 pairRDD[(K:word V:1)] val pairRDD = wordRDD.map { (_,1) } // 将pairRDD中相同的单词分为一组,对组内的数据进行累加 val restRDD = pairRDD.reduceByKey((v1,v2)=>v1+v2) //可简写为:val restRDD = pairRDD.reduceByKey(_+_) // 根据单词出现的次数来排序,sortBy():根据指定字段来排序,false:指定为降序; // foreach对RDD中排好序的数据进行遍历 restRDD .sortBy(x=>x._2, false) .foreach(println) //一直启动,为查看而写 while(true){} //释放资源 sc.stop() }}


  但从代码的编写上来看, 不难发现, Spark的表达能力着实比MR强, 上述代码中间处理部分其实还可以更加简洁:


val lineRDD = sc.textFile("d:/wc.txt") .flatMap { _.split("\t") } .map { (_,1) }  .reduceByKey(_+_) .sortBy(_._2, false) .foreach(println)



  MR中复杂的程序, 在Spark中了了几行就可以轻松解决, 既可以看出Scala语言的灵活性, 又表现了Spark超强的表达能力, 因此Spark在计算上逐渐取代MR.


  这里最后一句while(true){} ,让程序一直执行, 可以在WebUI的监控页面http://localhost:4040进行查看,