vlambda博客
学习文章列表

基于hbase-spark实现hive到hbase的数据传输中间件

 作为一个真正的程序员,首先应该尊重编程,热爱你所写下的程序,他是你的伙伴,而不是工具。


基于hbase-spark实现hive到hbase的数据传输中间件

一、hbase的数据写入方式

hbase的数据写入方式有很多,如java的hbase-client API和java/python的thrift API。数据量小时,这些方式应对起来已经绰绰有余。但是,当需要往hbase一次性导入大量数据,如:历史数据。此时,API的方式便显得力不从心,不仅过程漫长,而且还会由于客户端频繁的写入,引起RegionServer产生大量的flush、compact或split的操作,对线上正常的读写业务会有不可预估的延时影响。

面对这样的使用场景,通常的做法往往是,先生成hfile,然后用hbase自带的bulkload命令,快速把数据导入到hbase中。有关bulkload的使用细节,这里不会做过多赘述。

关于hfile的生成方式,成熟的方案有很多种,如:ImportTsv转csv为hfile,MR程序生成,Spark程序生成,还有CDH官方提供的组件,hbase-spark,也是这里重点要介绍的hfile的生成方式。

二、hbase-spark的介绍

Hbase-spark这种方式其实是利用Cloudera-labs开源的一个HbaseContext的工具类来支持spark用RDD的方式批量读写hbase。这种方式的优势在哪,官方的解释如下:

  • 无缝使用hbase connection

  • 和Kerberos无缝集成

  • 通过get或scan直接生成RDD

  • 利用RDD支持HBASE的任何组合操作

  • 为通用操作提供简单的方法,同时通过API允许不受限制的未知高级操作

  • 支持java和scala

  • 为spark和 spark streaming提供相似的API

三、开始使用

由于HbaseContext是一个只依赖Hadoop、Hbase、spark的jar包工具类,因此可以直接引入依赖使用。

1. 引入maven依赖

<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-spark</artifactId>
  <version>1.2.0-cdh5.13.1</version>
</dependency>

2. 我的环境

  • spark2.1.0

  • Hbase的版本为1.2.0-cdh5.13.1

  • jdk1.8

  • maven3.6.1

  • Mac OS

3. 注意事项

1) Could not access type Logging in package org.apache.spark

Spark2使用SparkOnHBase开发访问HBase时,代码编译时会报“Could not access type Logging in package org.apache.spark“。具体的异常栈信息如下:

Error:scalac: missing or invalid dependency detected while loading class file 'HBaseContext.class'.

Could not access type Logging in package org.apache.spark,

because it (or its dependencies) are missing. Check your build definition for
missing or conflicting dependencies. (Re-run with `-Ylog-classpath` to see the problematic classpath.)

full rebuild may help if 'HBaseContext.class' was compiled against an incompatible version of org.apache.spark.

具体原因可以参考:https://issues.apache.org/jira/browse/HBASE-16428

JIRA中找到在代码中引用了HBaseContext时,使用Spark2编译Spark应用程序将会失败,因为HBaseContext模块引用了org.apache.spark.Logging。在Spark2中,由于Logging被移动到一个私有的包下导致。同样也有Spark2的一个JIRA说明该问题:

https://issues.apache.org/jira/browse/SPARK-13928

问题解决:

  1. 在自己的工程下创建一个org.apache.spark的包。

    image-20200120102802674
  2. 在org.apache.spark包下创建一个Trait类型的Logging.scala类型,该类的内容通过Spark2源码找到。

    基于hbase-spark实现hive到hbase的数据传输中间件
    image-20200120103040113
  3. 将spark-core工程下org.apache.spark.internal.Logging类内容拷贝至我们工程下创建的org.apache.spark.Logging类中。

  4. Logging类的内容要确保与对应Spark2版本代码一致,避免造成一些莫名其妙的问题,改造完之后的org.apache.spark.Logging类。

    package org.apache.spark

    import org.apache.log4j.{Level, LogManager, PropertyConfigurator}
    import org.apache.spark.util.Utils
    import org.slf4j.{Logger, LoggerFactory}
    import org.slf4j.impl.StaticLoggerBinder

    trait Logging {
     // Make the log field transient so that objects with Logging can
     // be serialized and used on another machine
     @transient private var log_ : Logger = null

     // Method to get the logger name for this object
     protected def logName = {
       // Ignore trailing 
       this.getClass.getName.stripSuffix("<span class="katex-html" aria-hidden="true" style="word-wrap: inherit !important;word-break: inherit !important;"><span class="strut" style="height:1.001892em;vertical-align:-0.25em;" style="word-wrap: inherit !important;word-break: inherit !important;"><span class="vlist" style="height:0.751892em;" style="word-wrap: inherit !important;word-break: inherit !important;"><span style="top:-3.063em;margin-right:0.05em;" style="word-wrap: inherit !important;word-break: inherit !important;"><span class="pstrut" style="height:2.7em;" style="word-wrap: inherit !important;word-break: inherit !important;"><span class="sizing reset-size6 size3 mtight" style="word-wrap: inherit !important;word-break: inherit !important;"><span class="mord mtight" style="word-wrap: inherit !important;word-break: inherit !important;"><span class="mord mtight" style="word-wrap: inherit !important;word-break: inherit !important;">′<span class="mord mathit" style="word-wrap: inherit !important;word-break: inherit !important;">s<span class="mord mathit" style="word-wrap: inherit !important;word-break: inherit !important;">i<span class="mord mathit" style="word-wrap: inherit !important;word-break: inherit !important;">n<span class="mord mathit" style="word-wrap: inherit !important;word-break: inherit !important;">t<span class="mord mathit" style="word-wrap: inherit !important;word-break: inherit !important;">h<span class="mord mathit" style="word-wrap: inherit !important;word-break: inherit !important;">e<span class="mord mathit" style="word-wrap: inherit !important;word-break: inherit !important;">c<span class="mord mathit" style="margin-right:0.01968em;" style="word-wrap: inherit !important;word-break: inherit !important;">l<span class="mord mathit" style="word-wrap: inherit !important;word-break: inherit !important;">a<span class="mord mathit" style="word-wrap: inherit !important;word-break: inherit !important;">s<span class="mord mathit" style="word-wrap: inherit !important;word-break: inherit !important;">s<span class="mord mathit" style="word-wrap: inherit !important;word-break: inherit !important;">n<span class="mord mathit" style="word-wrap: inherit !important;word-break: inherit !important;">a<span class="mord mathit" style="word-wrap: inherit !important;word-break: inherit !important;">m<span class="mord mathit" style="word-wrap: inherit !important;word-break: inherit !important;">e<span class="mord mathit" style="word-wrap: inherit !important;word-break: inherit !important;">s<span class="mord mathit" style="margin-right:0.10764em;" style="word-wrap: inherit !important;word-break: inherit !important;">f<span class="mord mathit" style="word-wrap: inherit !important;word-break: inherit !important;">o<span class="mord mathit" style="margin-right:0.02778em;" style="word-wrap: inherit !important;word-break: inherit !important;">r<span class="mord mathit" style="margin-right:0.05764em;" style="word-wrap: inherit !important;word-break: inherit !important;">S<span class="mord mathit" style="word-wrap: inherit !important;word-break: inherit !important;">c<span class="mord mathit" style="word-wrap: inherit !important;word-break: inherit !important;">a<span class="mord mathit" style="margin-right:0.01968em;" style="word-wrap: inherit !important;word-break: inherit !important;">l<span class="mord mathit" style="word-wrap: inherit !important;word-break: inherit !important;">a<span class="mord mathit" style="word-wrap: inherit !important;word-break: inherit !important;">o<span class="mord mathit" style="word-wrap: inherit !important;word-break: inherit !important;">b<span class="mord mathit" style="margin-right:0.05724em;" style="word-wrap: inherit !important;word-break: inherit !important;">j<span class="mord mathit" style="word-wrap: inherit !important;word-break: inherit !important;">e<span class="mord mathit" style="word-wrap: inherit !important;word-break: inherit !important;">c<span class="mord mathit" style="word-wrap: inherit !important;word-break: inherit !important;">t<span class="mord mathit" style="word-wrap: inherit !important;word-break: inherit !important;">s<span class="mord mathit" style="word-wrap: inherit !important;word-break: inherit !important;">t<span class="mord mathit" style="word-wrap: inherit !important;word-break: inherit !important;">h<span class="mord mathit" style="word-wrap: inherit !important;word-break: inherit !important;">i<span class="mord mathit" style="word-wrap: inherit !important;word-break: inherit !important;">s<span class="mord mathit" style="margin-right:0.03588em;" style="word-wrap: inherit !important;word-break: inherit !important;">g<span class="mord mathit" style="word-wrap: inherit !important;word-break: inherit !important;">e<span class="mord mathit" style="word-wrap: inherit !important;word-break: inherit !important;">t<span class="mord mathit" style="margin-right:0.07153em;" style="word-wrap: inherit !important;word-break: inherit !important;">C<span class="mord mathit" style="margin-right:0.01968em;" style="word-wrap: inherit !important;word-break: inherit !important;">l<span class="mord mathit" style="word-wrap: inherit !important;word-break: inherit !important;">a<span class="mord mathit" style="word-wrap: inherit !important;word-break: inherit !important;">s<span class="mord mathit" style="word-wrap: inherit !important;word-break: inherit !important;">s<span class="mord mathit" style="margin-right:0.03588em;" style="word-wrap: inherit !important;word-break: inherit !important;">g<span class="mord mathit" style="word-wrap: inherit !important;word-break: inherit !important;">e<span class="mord mathit" style="word-wrap: inherit !important;word-break: inherit !important;">t<span class="mord mathit" style="margin-right:0.10903em;" style="word-wrap: inherit !important;word-break: inherit !important;">N<span class="mord mathit" style="word-wrap: inherit !important;word-break: inherit !important;">a<span class="mord mathit" style="word-wrap: inherit !important;word-break: inherit !important;">m<span class="mord mathit" style="word-wrap: inherit !important;word-break: inherit !important;">e<span class="mord mathit" style="word-wrap: inherit !important;word-break: inherit !important;">s<span class="mord mathit" style="word-wrap: inherit !important;word-break: inherit !important;">t<span class="mord mathit" style="margin-right:0.02778em;" style="word-wrap: inherit !important;word-break: inherit !important;">r<span class="mord mathit" style="word-wrap: inherit !important;word-break: inherit !important;">i<span class="mord mathit" style="word-wrap: inherit !important;word-break: inherit !important;">p<span class="mord mathit" style="margin-right:0.05764em;" style="word-wrap: inherit !important;word-break: inherit !important;">S<span class="mord mathit" style="word-wrap: inherit !important;word-break: inherit !important;">u<span class="mord mathit" style="margin-right:0.10764em;" style="word-wrap: inherit !important;word-break: inherit !important;">f<span class="mord mathit" style="margin-right:0.10764em;" style="word-wrap: inherit !important;word-break: inherit !important;">f<span class="mord mathit" style="word-wrap: inherit !important;word-break: inherit !important;">i<span class="mord mathit" style="word-wrap: inherit !important;word-break: inherit !important;">x
     }

     // Method to get or create the logger for this object
     protected def log: Logger = {
       if (log_ == null) {
         initializeLogIfNecessary(false)
         log_ = LoggerFactory.getLogger(logName)
       }
       log_
     }

     // Log methods that take only a String
     protected def logInfo(msg: => String) {
       if (log.isInfoEnabled) log.info(msg)
     }

     protected def logDebug(msg: => String) {
       if (log.isDebugEnabled) log.debug(msg)
     }

     protected def logTrace(msg: => String) {
       if (log.isTraceEnabled) log.trace(msg)
     }

     protected def logWarning(msg: => String) {
       if (log.isWarnEnabled) log.warn(msg)
     }

     protected def logError(msg: => String) {
       if (log.isErrorEnabled) log.error(msg)
     }

     // Log methods that take Throwables (Exceptions/Errors) too
     protected def logInfo(msg: => String, throwable: Throwable) {
       if (log.isInfoEnabled) log.info(msg, throwable)
     }

     protected def logDebug(msg: => String, throwable: Throwable) {
       if (log.isDebugEnabled) log.debug(msg, throwable)
     }

     protected def logTrace(msg: => String, throwable: Throwable) {
       if (log.isTraceEnabled) log.trace(msg, throwable)
     }

     protected def logWarning(msg: => String, throwable: Throwable) {
       if (log.isWarnEnabled) log.warn(msg, throwable)
     }

     protected def logError(msg: => String, throwable: Throwable) {
       if (log.isErrorEnabled) log.error(msg, throwable)
     }

     protected def isTraceEnabled(): Boolean = {
       log.isTraceEnabled
     }

     protected def initializeLogIfNecessary(isInterpreter: Boolean): Unit = {
       if (!Logging.initialized) {
         Logging.initLock.synchronized {
           if (!Logging.initialized) {
             initializeLogging(isInterpreter)
           }
         }
       }
     }

     private def initializeLogging(isInterpreter: Boolean): Unit = {
       // Don't use a logger in here, as this is itself occurring during initialization of a logger
       // If Log4j 1.2 is being used, but is not initialized, load a default properties file
       val binderClass = StaticLoggerBinder.getSingleton.getLoggerFactoryClassStr
       // This distinguishes the log4j 1.2 binding, currently
       // org.slf4j.impl.Log4jLoggerFactory, from the log4j 2.0 binding, currently
       // org.apache.logging.slf4j.Log4jLoggerFactory
       val usingLog4j12 = "org.slf4j.impl.Log4jLoggerFactory".equals(binderClass)
       if (usingLog4j12) {
         val log4j12Initialized = LogManager.getRootLogger.getAllAppenders.hasMoreElements
         // scalastyle:off println
         if (!log4j12Initialized) {
           val defaultLogProps = "org/apache/spark/log4j-defaults.properties"
           Option(Utils.getSparkClassLoader.getResource(defaultLogProps)) match {
             case Some(url) =>
               PropertyConfigurator.configure(url)
               System.err.println(s"Using Spark's default log4j profile: 
    不能识别此Latex公式:
    defaultLogProps")
             case None =>
               System.err.println(s"Spark was unable to load 
    defaultLogProps")
           }
         }
     if (isInterpreter) {
       // Use the repl's main class to define the default log level when running the shell,
       // overriding the root logger's config if they're different.
       val rootLogger = LogManager.getRootLogger()
       val replLogger = LogManager.getLogger(logName)
       val replLevel = Option(replLogger.getLevel()).getOrElse(Level.WARN)
       if (replLevel != rootLogger.getEffectiveLevel()) {
         System.err.printf("Setting default log level to \"%s\".\n", replLevel)
         System.err.println("To adjust logging level use sc.setLogLevel(newLevel). " +
           "For SparkR, use setLogLevel(newLevel).")
         rootLogger.setLevel(replLevel)
       }
     }
     // scalastyle:on println
       }
       Logging.initialized = true

       // Force a call into slf4j to initialize it. Avoids this happening from multiple threads
       // and triggering this: http://mailman.qos.ch/pipermail/slf4j-dev/2010-April/002956.html
       log
     }
    }

    object Logging {
     @volatile var initialized = false
     val initLock = new Object()
     try {
       // We use reflection here to handle the case where users remove the
       // slf4j-to-jul bridge order to route their logs to JUL.
       val bridgeClass = Utils.classForName("org.slf4j.bridge.SLF4JBridgeHandler")
       bridgeClass.getMethod("removeHandlersForRootLogger").invoke(null)
       val installed = bridgeClass.getMethod("isInstalled").invoke(null).asInstanceOf[Boolean]
       if (!installed) {
         bridgeClass.getMethod("install").invoke(null)
       }
     } catch {
       case e: ClassNotFoundException => // can't log anything yet so just fail silently
     }
    }
    </span class="mord mathit"></span class="mord mathit"></span class="mord mathit" style="margin-right:0.10764em;"></span class="mord mathit" style="margin-right:0.10764em;"></span class="mord mathit"></span class="mord mathit" style="margin-right:0.05764em;"></span class="mord mathit"></span class="mord mathit"></span class="mord mathit" style="margin-right:0.02778em;"></span class="mord mathit"></span class="mord mathit"></span class="mord mathit"></span class="mord mathit"></span class="mord mathit"></span class="mord mathit" style="margin-right:0.10903em;"></span class="mord mathit"></span class="mord mathit"></span class="mord mathit" style="margin-right:0.03588em;"></span class="mord mathit"></span class="mord mathit"></span class="mord mathit"></span class="mord mathit" style="margin-right:0.01968em;"></span class="mord mathit" style="margin-right:0.07153em;"></span class="mord mathit"></span class="mord mathit"></span class="mord mathit" style="margin-right:0.03588em;"></span class="mord mathit"></span class="mord mathit"></span class="mord mathit"></span class="mord mathit"></span class="mord mathit"></span class="mord mathit"></span class="mord mathit"></span class="mord mathit"></span class="mord mathit" style="margin-right:0.05724em;"></span class="mord mathit"></span class="mord mathit"></span class="mord mathit"></span class="mord mathit" style="margin-right:0.01968em;"></span class="mord mathit"></span class="mord mathit"></span class="mord mathit" style="margin-right:0.05764em;"></span class="mord mathit" style="margin-right:0.02778em;"></span class="mord mathit"></span class="mord mathit" style="margin-right:0.10764em;"></span class="mord mathit"></span class="mord mathit"></span class="mord mathit"></span class="mord mathit"></span class="mord mathit"></span class="mord mathit"></span class="mord mathit"></span class="mord mathit"></span class="mord mathit" style="margin-right:0.01968em;"></span class="mord mathit"></span class="mord mathit"></span class="mord mathit"></span class="mord mathit"></span class="mord mathit"></span class="mord mathit"></span class="mord mathit"></span class="mord mtight"></span class="mord mtight"></span class="sizing reset-size6 size3 mtight"></span class="pstrut" style="height:2.7em;"></span style="top:-3.063em;margin-right:0.05em;"></span class="vlist" style="height:0.751892em;"></span class="strut" style="height:1.001892em;vertical-align:-0.25em;"></span class="katex-html" aria-hidden="true">

2)streaming相关的异常

如果编译项目的时候,pom文件中不加入spark-streaming的依赖,会报如下异常:

image-20200120103608216

解决方法是,pom文件中加入spark-streaming的依赖,即使你的代码里没有用到。

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.11</artifactId>
    <version>2.1.0</version>
    <scope>provided</scope>
</dependency>

4. 生成hfile的核心代码

//从hive表读取数据
    val dataHiveDF = sparkSession.table(sourceTableName)
    //表结构字段
    var fields = dataHiveDF.columns

    //去掉rowKey字段
    fields = fields.dropWhile(_ == rowKeyField)

    val hBaseConf = HBaseConfiguration.create()
    hBaseConf.set("hbase.zookeeper.quorum", zkAddress)
    hBaseConf.set("hbase.zookeeper.property.clientPort", zkPort)

    val hbaseContext = new HBaseContext(sc, hBaseConf)
    var rddNew = dataHiveDF.toJSON.rdd.flatMap(row => {
      val json = JSON.parseObject(row)
      val rowKey = json.get(rowKeyField).toString
      fields.map(field => {
        val fieldValue = json.get(field).toString
        if (toCaps) {
          (Bytes.toBytes(rowKey), Array((Bytes.toBytes(familyName), Bytes.toBytes(field.toUpperCase), Bytes.toBytes(fieldValue))))
        } else {
          (Bytes.toBytes(rowKey), Array((Bytes.toBytes(familyName), Bytes.toBytes(field), Bytes.toBytes(fieldValue))))
        }
      })
    })
    if (partitionsNum > 0) {
      rddNew = rddNew.coalesce(partitionsNum)
    }
    //使用HBaseContext的bulkLoad生成HFile文件
    hbaseContext.bulkLoad[Put](rddNew.map(record => {
      val put = new Put(record._1)
      record._2.foreach(putValue => put.addColumn(putValue._1, putValue._2, putValue._3))
      put
    }), TableName.valueOf(hBaseTempTable), (t: Put) => putForLoad(t), storeHfilePath)

    sparkSession.stop()
  }

  /**
   * Prepare the Put object for bulk load function.
   *
   * @param put The put object.
   * @throws java.io.IOException            IOException
   * @throws java.lang.InterruptedException InterruptedException
   * @return Tuple of (KeyFamilyQualifier, bytes of cell value)*/
  @throws(classOf[IOException])
  @throws(classOf[InterruptedException])
  def putForLoad(put: Put): Iterator[(KeyFamilyQualifier, Array[Byte])] = {
    val ret: mutable.MutableList[(KeyFamilyQualifier, Array[Byte])] = mutable.MutableList()
    import scala.collection.JavaConversions._
    for (cells <- put.getFamilyCellMap.entrySet().iterator()) {
      val family = cells.getKey
      for (value <- cells.getValue) {
        val kfq = new KeyFamilyQualifier(CellUtil.cloneRow(value), family, CellUtil.cloneQualifier(value))
        ret.+=((kfq, CellUtil.cloneValue(value)))
      }
    }
    ret.iterator
  }

  def getValue(argMap: Map[String, String], key: String, defaultValue: String = null): String = {
    if (defaultValue != null) {
      argMap.getOrElse(key, defaultValue)
    } else {
      if (argMap.contains(key)) {
        argMap(key)
      } else {
        printArgUsage()
        throw new Exception("参数不正确!")
      }
    }
  }

5. 完整的项目路径

https://gitee.com/leojie/bigdata/tree/master/bigdata-hbase-spark2.1

更详细的使用说明参考:

https://gitee.com/leojie/bigdata/blob/master/bigdata-hbase-spark2.1/README.md

四、总结

以上便是集成Cloudera的hbase-spark模块,实现hfile快速生成的核心步骤,基于此,大家可以实现自己的hbase数据大量导入的中间件。

五、参考链接

  • https://blog.csdn.net/weixin_44455388/article/details/102586488

  • https://cloud.tencent.com/developer/article/1399924

  • https://cloud.tencent.com/developer/article/1399926