Spark 核心数据结构:弹性分布式数据集 RDD
在面对出错情况(例如任意一台节点宕机)时,Spark 能通过 RDD 之间的依赖关系恢复任意出错的 RDD(如 B 和 D 可以算出最后的 RDD),RDD 就像一块海绵一样,无论怎么挤压,都像海绵一样完整;
在经过转换算子处理时,RDD 中的分区数以及分区所在的位置随时都有可能改变。
分区的集合;
用来基于分区进行计算的函数(算子);
依赖(与其他 RDD)的集合;
对于键-值型的 RDD 的散列分区器(可选);
// 表示RDD之间的依赖关系的成员变量private var deps: Seq[Dependency[_]]// 分区器成员变量val partitioner: Option[Partitioner] = None// 该RDD所引用的分区集合成员变量private var partitions_ : Array[Partition] = null// 得到该RDD与其他RDD之间的依赖关系protected def getDependencies: Seq[Dependency[_]] = deps// 得到该RDD所引用的分区protected def getPartitions: Array[Partition]// 得到每个分区地址protected def getPreferredLocations(split: Partition): Seq[String] = Nil// distinct算子def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] =withScope {map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)}
@transient private var partitions_ : Array[Partition] = null
Spark编程是一件不难的工作,而事实也确实如此。现在我们可以通过已有的 SparkSession 直接创建 RDD。创建RDD的方式有以下几类:通过并行集合创建RDD;从HDFS中加载数据创建RDD;从linux本地文件系统加载数据创建RDD。
了解了RDD的创建方式,接下来,我们逐个进行演示介绍:
//val spark: SparkSession = .......val rdd = spark.sparkcontext.parallelize(Seq(1, 2, 3))
//val spark: SparkSession = .......val rdd = spark.sparkcontext.textFile("hdfs://namenode:8020/user/me/wiki.txt")
//val spark: SparkSession = .......val lowerBound = 1val upperBound = 1000val numPartition = 10val rdd = new JdbcRDD(spark.sparkcontext,() => {Class.forName("com.mysql.jdbc.Driver").newInstance()DriverManager.getConnection("jdbc:mysql://localhost:3306/db", "root", "123456")},"SELECT content FROM mysqltable WHERE ID >= ? AND ID <= ?",lowerBound,upperBound,numPartition,r => r.getString(1))
//val spark: SparkSession = .......val sc = spark.sparkcontextval tablename = "your_hbasetable"val conf = HBaseConfiguration.create()conf.set("hbase.zookeeper.quorum", "zk1,zk2,zk3")conf.set("hbase.zookeeper.property.clientPort", "2181")conf.set(TableInputFormat.INPUT_TABLE, tablename)val rdd= sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result])// 利用HBase API解析出行键与列值rdd_three.foreach{case (_,result) => {val rowkey = Bytes.toString(result.getRow)val value1 = Bytes.toString(result.getValue("cf".getBytes,"c1".getBytes))}
