sparkSQL自定义数据源查询HBase数据
对于sparkSQL,我们并不陌生,可以用来做我们的数据分析等,而且sparkSQL也支持自定义数据源,总的来说SparkSQL支持的数据源还是挺丰富的,但业务上可能不拘束于这几种数据源,比如将HBase作为SparkSQL的数据源,REST数据源等。Spark 1.3 引入了第一版的数据源 API,我们可以使用它将常见的数据格式整合到 Spark SQL 中。但是,随着 Spark 的不断发展,这一 API 也体现出了其局限性,故而 Spark 团队不得不加入越来越多的专有代码来编写数据源,以获得更好的性能。Spark 2.3 中,新一版的数据源 API 初见雏形,它克服了上一版 API 的种种问题,原来的数据源代码也在逐步重写。本文将演示这两版 API 的使用方法,比较它们的不同之处,以及新版 API 的优势在哪里。
DataSourceV1是spark1.x当中老版本的API,主要有以下不足:
1.依赖上层 API
2.难以添加新的优化算子
3.难以传递分区信息
4.缺少事务性的写操作
5.缺少列存储和流式计算支持
在spark2.x版本当中,提供了新的API,基于DataSourceV2来进行自定义数据源,非常灵活,我们也主要使用DataSourceV2来实现自定义数据源,主要有以下优势:
1.裁剪字段
2.过滤条件
3.多分区支持
4.事务性的写操作
5.列存储与流式计算支持
总之没有无缘无故的爱,也没有毫无缘由的恨,你喜欢这个框架,哈哈哈,那么它就有一万个理由让你爱不释手,技术没有绝对的好坏之分,人才有
废话不多说,我们直接来上手操作一把,看看如何基于DataSourceV2来自定义我们的sparkSQL数据源去查询hbase的数据:
1、创建hbase数据源表
为了实现我们的sparkSQL自定义数据源获取Hbase当中的数据,我们可以开发测试用例,通过自定义数据源实现获取Hbase当中的数据,然后将查询的数据保存到Hbase里面去
node01执行以下命令创建Hbase表
cd /kkb/install/hbase-1.2.0-cdh5.14.2/
bin/hbase shell
create 'spark_hbase_sql','cf'
put 'spark_hbase_sql','0001','cf:name','zhangsan'
put 'spark_hbase_sql','0001','cf:score','80'
put 'spark_hbase_sql','0002','cf:name','lisi'
put 'spark_hbase_sql','0002','cf:score','60'
2、创建Hbase的数据保存表
node01执行以下命令创建Hbase表,用于将分析之后的结果数据保存到Hbase当中来
cd /kkb/install/hbase-1.2.0-cdh5.14.2/
bin/hbase shell
create 'spark_hbase_write','cf'
3、自定义SparkSQL的数据源读取Hbase数据以及将分析结果
在我们的travel_spark这个项目模块下,创建package com.travel.programAppp,然后在这个package下面创建scala的object文件HbaseSourceAndSink.scala
p
import java.util
import java.util.Optional
import com.travel.utils.HbaseTools
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.sql.sources.v2.writer.{DataSourceWriter, DataWriter, DataWriterFactory, WriterCommitMessage}
import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, WriteSupport}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession}
object HBaseSourceAndSink {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.master("local[2]")
.getOrCreate()
val df = spark.read
.format("com.travel.programApp.HBaseSource")
.option("hbase.table.name", "spark_hbase_sql")
.option("schema", "`name` STRING,`score` STRING")
.option("cf.cc","cf:name,cf:score")
.load()
df.explain(true)
df.createOrReplaceTempView("sparkHBaseSQL")
df.printSchema()
val frame: DataFrame = spark.sql("select * from sparkHBaseSQL where score > 60")
frame.write.format("com.travel.programApp.HBaseSource")
.mode(SaveMode.Overwrite)
.option("hbase.table.name","spark_hbase_write")
.save()
}
}
class HBaseSource extends DataSourceV2 with ReadSupport with WriteSupport{
override def createReader(options: DataSourceOptions): DataSourceReader = {
new HBaseDataSourceReader(options.get("hbase.table.name").get(),options.get("schema").get(),options.get("cf.cc").get())
}
override def createWriter(jobId: String, schema: StructType, mode: SaveMode, options: DataSourceOptions): Optional[DataSourceWriter] = {
Optional.of(new HBaseDataSourceWrite)
}
}
class HBaseDataSourceWrite extends DataSourceWriter{
override def createWriterFactory(): DataWriterFactory[Row] = {
new HBaseDataWriterFactory
}
override def commit(messages: Array[WriterCommitMessage]): Unit = {
}
override def abort(messages: Array[WriterCommitMessage]): Unit = {
}
}
class HBaseDataWriterFactory extends DataWriterFactory[Row]{
override def createDataWriter(partitionId: Int, attemptNumber: Int): DataWriter[Row] = {
new HBaseDataWriter
}
}
class HBaseDataWriter extends DataWriter[Row]{
private val conn: Connection = HbaseTools.getHbaseConn
private val table: Table = conn.getTable(TableName.valueOf("spark_hbase_write"))
override def write(record: Row): Unit = {
val name: String = record.getString(0)
val score: String = record.getString(1)
val put = new Put("0001".getBytes())
put.addColumn("cf".getBytes(),"name".getBytes(),name.getBytes())
put.addColumn("cf".getBytes(),"score".getBytes(),score.getBytes())
table.put(put)
}
override def commit(): WriterCommitMessage = {
table.close()
conn.close()
null
}
override def abort(): Unit = {
null
}
}
class HBaseDataSourceReader(tableName:String,schema:String,cfcc:String) extends DataSourceReader {
//定义HBase的schema
private val structType: StructType = StructType.fromDDL(schema)
override def readSchema(): StructType = {
structType
}
//返回DataReaderFactory
override def createDataReaderFactories(): util.List[DataReaderFactory[Row]] = {
import collection.JavaConverters._
Seq(
new HBaseReaderFactory(tableName,cfcc).asInstanceOf[DataReaderFactory[Row]]
).asJava
}
}
class HBaseReaderFactory(tableName:String,cfcc:String) extends DataReaderFactory[Row] {
override def createDataReader(): DataReader[Row] = {
new HBaseReader(tableName,cfcc)
}
}
class HBaseReader(tableName:String,cfcc:String) extends DataReader[Row] {
private var hbaseConnection:Connection = null
private var resultScanner:ResultScanner = null
private var nextResult:Result = null
// 获取HBase当中的数
val data: Iterator[Seq[AnyRef]] = getIterator
def getIterator: Iterator[Seq[AnyRef]] = {
import scala.collection.JavaConverters._
hbaseConnection = HbaseTools.getHbaseConn
val table: Table = hbaseConnection.getTable(TableName.valueOf(tableName))
resultScanner = table.getScanner(new Scan())
val iterator: Iterator[Seq[AnyRef]] = resultScanner.iterator().asScala.map(eachResult => {
val str: String = Bytes.toString(eachResult.getValue("cf".getBytes(), "name".getBytes()))
val score: String = Bytes.toString(eachResult.getValue("cf".getBytes(), "score".getBytes()))
Seq(str, score)
})
iterator
}
override def next(): Boolean = {
data.hasNext
}
override def get(): Row = {
val seq: Seq[Any] = data.next()
Row.fromSeq(seq)
}
override def close(): Unit = {
hbaseConnection.close()
}
}