vlambda博客
学习文章列表

sparkSQL自定义数据源查询HBase数据

对于sparkSQL,我们并不陌生,可以用来做我们的数据分析等,而且sparkSQL也支持自定义数据源,总的来说SparkSQL支持的数据源还是挺丰富的,但业务上可能不拘束于这几种数据源,比如将HBase作为SparkSQL的数据源,REST数据源等。Spark 1.3 引入了第一版的数据源 API,我们可以使用它将常见的数据格式整合到 Spark SQL 中。但是,随着 Spark 的不断发展,这一 API 也体现出了其局限性,故而 Spark 团队不得不加入越来越多的专有代码来编写数据源,以获得更好的性能。Spark 2.3 中,新一版的数据源 API 初见雏形,它克服了上一版 API 的种种问题,原来的数据源代码也在逐步重写。本文将演示这两版 API 的使用方法,比较它们的不同之处,以及新版 API 的优势在哪里。

DataSourceV1是spark1.x当中老版本的API,主要有以下不足:

1.依赖上层 API

2.难以添加新的优化算子

3.难以传递分区信息

4.缺少事务性的写操作

5.缺少列存储和流式计算支持

在spark2.x版本当中,提供了新的API,基于DataSourceV2来进行自定义数据源,非常灵活,我们也主要使用DataSourceV2来实现自定义数据源,主要有以下优势:

1.裁剪字段

2.过滤条件

3.多分区支持

4.事务性的写操作

5.列存储与流式计算支持

总之没有无缘无故的爱,也没有毫无缘由的恨,你喜欢这个框架,哈哈哈,那么它就有一万个理由让你爱不释手,技术没有绝对的好坏之分,人才有

废话不多说,我们直接来上手操作一把,看看如何基于DataSourceV2来自定义我们的sparkSQL数据源去查询hbase的数据:


1、创建hbase数据源表

为了实现我们的sparkSQL自定义数据源获取Hbase当中的数据,我们可以开发测试用例,通过自定义数据源实现获取Hbase当中的数据,然后将查询的数据保存到Hbase里面去

node01执行以下命令创建Hbase

cd /kkb/install/hbase-1.2.0-cdh5.14.2/

bin/hbase shell

create 'spark_hbase_sql','cf'

put 'spark_hbase_sql','0001','cf:name','zhangsan'

put 'spark_hbase_sql','0001','cf:score','80'

put 'spark_hbase_sql','0002','cf:name','lisi'

put 'spark_hbase_sql','0002','cf:score','60'

 

2、创建Hbase的数据保存表

node01执行以下命令创建Hbase表,用于将分析之后的结果数据保存到Hbase当中来

cd /kkb/install/hbase-1.2.0-cdh5.14.2/

bin/hbase shell

create 'spark_hbase_write','cf'

 

3、自定义SparkSQL的数据源读取Hbase数据以及将分析结果

在我们的travel_spark这个项目模块下,创建package com.travel.programAppp,然后在这个package下面创建scala的object文件HbaseSourceAndSink.scala

 p

import java.util

import java.util.Optional

import com.travel.utils.HbaseTools

import org.apache.hadoop.hbase.TableName

import org.apache.hadoop.hbase.client._

import org.apache.hadoop.hbase.util.Bytes

import org.apache.spark.sql.sources.v2.reader._

import org.apache.spark.sql.sources.v2.writer.{DataSourceWriter, DataWriter, DataWriterFactory, WriterCommitMessage}

import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, WriteSupport}

import org.apache.spark.sql.types.StructType

import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession}

 

object HBaseSourceAndSink {

  def main(args: Array[String]): Unit = {

    val spark = SparkSession

      .builder()

      .master("local[2]")

      .getOrCreate()

 

    val df = spark.read

      .format("com.travel.programApp.HBaseSource")

      .option("hbase.table.name", "spark_hbase_sql")

      .option("schema", "`name` STRING,`score` STRING")

      .option("cf.cc","cf:name,cf:score")

      .load()

    df.explain(true)

 

    df.createOrReplaceTempView("sparkHBaseSQL")

 

    df.printSchema()

 

    val frame: DataFrame = spark.sql("select * from sparkHBaseSQL where score > 60")

 

    frame.write.format("com.travel.programApp.HBaseSource")

      .mode(SaveMode.Overwrite)

      .option("hbase.table.name","spark_hbase_write")

      .save()

 

  }

}

 

class HBaseSource extends DataSourceV2 with ReadSupport with WriteSupport{

  override def createReader(options: DataSourceOptions): DataSourceReader = {

    new HBaseDataSourceReader(options.get("hbase.table.name").get(),options.get("schema").get(),options.get("cf.cc").get())

  }

 

  override def createWriter(jobId: String, schema: StructType, mode: SaveMode, options: DataSourceOptions): Optional[DataSourceWriter] = {

    Optional.of(new HBaseDataSourceWrite)

 

  }

}


class HBaseDataSourceWrite extends DataSourceWriter{

  override def createWriterFactory(): DataWriterFactory[Row] = {

    new HBaseDataWriterFactory

  }

 

  override def commit(messages: Array[WriterCommitMessage]): Unit = {

 

  }

 

  override def abort(messages: Array[WriterCommitMessage]): Unit = {

 

  }

}

 

class HBaseDataWriterFactory extends DataWriterFactory[Row]{

  override def createDataWriter(partitionId: Int, attemptNumber: Int): DataWriter[Row] = {

    new HBaseDataWriter

  }

}

 

class HBaseDataWriter extends DataWriter[Row]{

 

  private val conn: Connection = HbaseTools.getHbaseConn

 

  private val table: Table = conn.getTable(TableName.valueOf("spark_hbase_write"))

 

  override def write(record: Row): Unit = {

    val name: String = record.getString(0)

    val score: String = record.getString(1)

 

    val put = new Put("0001".getBytes())

    put.addColumn("cf".getBytes(),"name".getBytes(),name.getBytes())

    put.addColumn("cf".getBytes(),"score".getBytes(),score.getBytes())

 

    table.put(put)

 

  }

 

  override def commit(): WriterCommitMessage = {

    table.close()

    conn.close()

    null

 

  }

 

  override def abort(): Unit = {

    null

 

  }

}


class HBaseDataSourceReader(tableName:String,schema:String,cfcc:String) extends DataSourceReader  {

  //定义HBaseschema

  private val structType: StructType = StructType.fromDDL(schema)

  override def readSchema(): StructType = {

    structType

  }

  //返回DataReaderFactory

  override def createDataReaderFactories(): util.List[DataReaderFactory[Row]] = {

    import collection.JavaConverters._

    Seq(

    new HBaseReaderFactory(tableName,cfcc).asInstanceOf[DataReaderFactory[Row]]

    ).asJava

  }

 

}


class HBaseReaderFactory(tableName:String,cfcc:String) extends  DataReaderFactory[Row] {

  override def createDataReader(): DataReader[Row] = {

    new HBaseReader(tableName,cfcc)

  }

}

 

class HBaseReader(tableName:String,cfcc:String) extends DataReader[Row] {

 

  private var hbaseConnection:Connection = null

  private var  resultScanner:ResultScanner = null

 

  private var nextResult:Result  = null

 

  // 获取HBase当中的数

  val data: Iterator[Seq[AnyRef]] = getIterator

 

  def getIterator: Iterator[Seq[AnyRef]] = {

    import scala.collection.JavaConverters._

    hbaseConnection = HbaseTools.getHbaseConn

    val table: Table = hbaseConnection.getTable(TableName.valueOf(tableName))

    resultScanner = table.getScanner(new Scan())

    val iterator: Iterator[Seq[AnyRef]] = resultScanner.iterator().asScala.map(eachResult => {

      val str: String = Bytes.toString(eachResult.getValue("cf".getBytes(), "name".getBytes()))

      val score: String = Bytes.toString(eachResult.getValue("cf".getBytes(), "score".getBytes()))

      Seq(str, score)

    })

    iterator

  }

  override def next(): Boolean = {

    data.hasNext

  }

  override def get(): Row = {

    val seq: Seq[Any] = data.next()

    Row.fromSeq(seq)

  }

  override def close(): Unit = {

    hbaseConnection.close()

  }

 

}