Spark 核心数据结构:弹性分布式数据集 RDD
在面对出错情况(例如任意一台节点宕机)时,Spark 能通过 RDD 之间的依赖关系恢复任意出错的 RDD(如 B 和 D 可以算出最后的 RDD),RDD 就像一块海绵一样,无论怎么挤压,都像海绵一样完整;
在经过转换算子处理时,RDD 中的分区数以及分区所在的位置随时都有可能改变。
分区的集合;
用来基于分区进行计算的函数(算子);
依赖(与其他 RDD)的集合;
对于键-值型的 RDD 的散列分区器(可选);
// 表示RDD之间的依赖关系的成员变量
private var deps: Seq[Dependency[_]]
// 分区器成员变量
val partitioner: Option[Partitioner] = None
// 该RDD所引用的分区集合成员变量
private var partitions_ : Array[Partition] = null
// 得到该RDD与其他RDD之间的依赖关系
protected def getDependencies: Seq[Dependency[_]] = deps
// 得到该RDD所引用的分区
protected def getPartitions: Array[Partition]
// 得到每个分区地址
protected def getPreferredLocations(split: Partition): Seq[String] = Nil
// distinct算子
def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] =
withScope {
map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
}
@transient private var partitions_ : Array[Partition] = null
Spark编程是一件不难的工作,而事实也确实如此。现在我们可以通过已有的 SparkSession 直接创建 RDD。创建RDD的方式有以下几类:通过并行集合创建RDD;从HDFS中加载数据创建RDD;从linux本地文件系统加载数据创建RDD。
了解了RDD的创建方式,接下来,我们逐个进行演示介绍:
//val spark: SparkSession = .......
val rdd = spark.sparkcontext.parallelize(Seq(1, 2, 3))
//val spark: SparkSession = .......
val rdd = spark.sparkcontext.textFile("hdfs://namenode:8020/user/me/wiki.txt")
//val spark: SparkSession = .......
val lowerBound = 1
val upperBound = 1000
val numPartition = 10
val rdd = new JdbcRDD(spark.sparkcontext,() => {
Class.forName("com.mysql.jdbc.Driver").newInstance()
DriverManager.getConnection("jdbc:mysql://localhost:3306/db", "root", "123456")
},
"SELECT content FROM mysqltable WHERE ID >= ? AND ID <= ?",
lowerBound,
upperBound,
numPartition,
r => r.getString(1)
)
//val spark: SparkSession = .......
val sc = spark.sparkcontext
val tablename = "your_hbasetable"
val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", "zk1,zk2,zk3")
conf.set("hbase.zookeeper.property.clientPort", "2181")
conf.set(TableInputFormat.INPUT_TABLE, tablename)
val rdd= sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
// 利用HBase API解析出行键与列值
rdd_three.foreach{case (_,result) => {
val rowkey = Bytes.toString(result.getRow)
val value1 = Bytes.toString(result.getValue("cf".getBytes,"c1".getBytes))
}