大数据技术之SparkCore(一)
IO流与RDD?
io流方式读取数据:字节流方式读取数据->字节缓冲流进行包装->一次读取一行的缓存流的方式读取数据
流的读取,都是在调用read。称之为惰性加载。
rdd方式:创建rdd->各种算子进行包装产生新的rdd->最后调用行动算子才会执行
rdd同样为懒加载,两者都使用了装饰者设计模式。
什么是RDD?
-RDD叫做弹性分布式数据集,是Spark中最基本的数据抽象,代码中是一个抽象类,他代表一个弹性的、不可变的、可分区、里面的元素可计算的集合。
-数据集:封装了计算业务逻辑,并没有存放数据
-弹性:存储:内存与磁盘自动切换;容错:数据丢失可自动恢复;计算:计算出错重试机制;分片:可根据需要分片。
-分布式:数据存储在大数据集群不同节点上。
-数据抽象:RDD是一个抽象类,需要子类具体实现。
-不可变:RDD封装了计算逻辑,是不可以改变的,想要改变,只能产生新的RDD。
-可分区:并行计算。
RDD五大特性?
1)A list of partitions (一组分区)
2)A function for computing each split(计算每个分区的函数)
Spark 中RDD 的计算是以分片为单位的,每个 RDD 都会实现compute函数以达到这个目的
3)A list of dependencies on other RDDs(与其他RDD的依赖关系)
RDD 的每次转换都会生成一个新的 RDD, 所以 RDD 之间会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark 可以通过这个依赖关系重新计算丢失的分区数据, 而不是对 RDD 的所有分区进行重新计算
4)一个分区器(partitioner),并不是所有rdd都有,只针对kv类型RDD
只有对于 key-value的 RDD,才会有 Partitioner, 非key-value的 RDD 的Partitioner 的值是 None;Partitiner 不但决定了 RDD 的本区数量, 也决定了parent RDD Shuffle 输出时的分区数量
5)存储数据位置的列表,移动数据不如移动计算
比如对于一个 HDFS 文件来说, 这个列表保存的就是每个 Partition 所在文件块的位置. 按照“移动数据不如移动计算”的理念, Spark 在进行任务调度的时候, 会尽可能地将计算任务分配到其所要处理数据块的存储位置.
RDD编程?
在spark中,RDD被表示为对象,通过对象上的方法调用来对RDD进行转换,通过调用行动算子触发RDD计算,在spark中。只有遇到行动算子,RDD才会执行(延时计算)。
创建RDD的三种方式?
1)从集合中创建RDD
-parallelize
-makeRDD(底层调用parallelize)
2)从外部存储创建RDD
-textFile
3)从其他RDD转换
IDEA环境准备?
创建工程,添加scala支持,在pom文件中添加如下:
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.1</version>
</dependency>
</dependencies>
<build>
<finalName>SparkCoreTest</finalName>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.4.6</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
从集合中创建RDD?
bject CreateRDD {
def main(args: Array[String]): Unit = {
//创建sparkconf
val conf: SparkConf = new SparkConf().setAppName("createrdd").setMaster("local[*]")
//创建sparkcontext,是提交spark app的入口
val sc:SparkContext= new SparkContext(conf)
//创建集合对象
val list = List(1,2,3,4)
//根据集合创建RDD
// val rdd: RDD[Int] = sc.parallelize(list)
val rdd: RDD[Int] = sc.makeRDD(list)
rdd.collect().foreach(println)
sc.stop()
}
}
从外部存储创建RDD?
object CreateRDDTextFile {
def main(args: Array[String]): Unit = {
//创建sparkconf
val conf = new SparkConf().setAppName("txt").setMaster("local")
val sc = new SparkContext(conf)
val rdd: RDD[String] = sc.textFile("D:\\com.atguigu\\MyWork\\Programe\\IDEA\\workspaceidea\\bigData-1122\\spark1122\\input")
rdd.foreach(println)
}
}
注:input文件要提前创建好。
从其他RDD创建RDD?
val rdd: RDD[Int] = sc.makeRDD(list)
val rdd2: RDD[(Int, Int)] = rdd.map((_,1))
分区规则?
从集合中创建RDD:如果设置分区按照设置的个数。如果没设置分区,分区数取决去cpu核数。
从外部文件创建RDD:取决于分配给应用的cpu核数和2取最小值。