通过Spark生成HFile,并以BulkLoad方式将数据导入到HBase
-
通过HBase的put API进行数据的批量写入 通过生成HFile文件,然后通过BulkLoad方式将数据存入HBase
HBase的数据最终是以HFile的形式存储到HDFS上的,如果我们能直接将数据生成为HFile文件,然后将HFile文件保存到HBase对应的表中,可以避免上述的很多问题,效率会相对更高。
1.1 数据样例
{"id":"1","name":"jack","age":"18"}{"id":"2","name":"mike","age":"19"}{"id":"3","name":"kilos","age":"20"}{"id":"4","name":"tom","age":"21"}...
/*** @Author bigdatalearnshare*/object App {def main(args: Array[String]): Unit = {System.setProperty("HADOOP_USER_NAME", "root")val sparkSession = SparkSession.builder().config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").master("local[*]").getOrCreate()val rowKeyField = "id"val df = sparkSession.read.format("json").load("/people.json")val fields = df.columns.filterNot(_ == "id").sortedval data = df.rdd.map { row =>val rowKey = Bytes.toBytes(row.getAs(rowKeyField).toString)val kvs = fields.map { field =>new KeyValue(rowKey, Bytes.toBytes("hfile-fy"), Bytes.toBytes(field), Bytes.toBytes(row.getAs(field).toString))}(new ImmutableBytesWritable(rowKey), kvs)}.flatMapValues(x => x).sortByKey()val hbaseConf = HBaseConfiguration.create(sparkSession.sessionState.newHadoopConf())hbaseConf.set("hbase.zookeeper.quorum", "linux-1:2181,linux-2:2181,linux-3:2181")hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, "hfile")val connection = ConnectionFactory.createConnection(hbaseConf)val tableName = TableName.valueOf("hfile")//没有HBase表则创建creteHTable(tableName, connection)val table = connection.getTable(tableName)try {val regionLocator = connection.getRegionLocator(tableName)val job = Job.getInstance(hbaseConf)job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])job.setMapOutputValueClass(classOf[KeyValue])HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator)val savePath = "hdfs://linux-1:9000/hfile_save"delHdfsPath(savePath, sparkSession)job.getConfiguration.set("mapred.output.dir", savePath)data.saveAsNewAPIHadoopDataset(job.getConfiguration)val bulkLoader = new LoadIncrementalHFiles(hbaseConf)bulkLoader.doBulkLoad(new Path(savePath), connection.getAdmin, table, regionLocator)} finally {//WARN LoadIncrementalHFiles: Skipping non-directory hdfs://linux-1:9000/hfile_save/_SUCCESS 不影响,直接把文件移到HBASE对应HDFS地址了table.close()connection.close()}sparkSession.stop()}def creteHTable(tableName: TableName, connection: Connection): Unit = {val admin = connection.getAdminif (!admin.tableExists(tableName)) {val tableDescriptor = new HTableDescriptor(tableName)tableDescriptor.addFamily(new HColumnDescriptor(Bytes.toBytes("hfile-fy")))admin.createTable(tableDescriptor)}}def delHdfsPath(path: String, sparkSession: SparkSession) {val hdfs = FileSystem.get(sparkSession.sessionState.newHadoopConf())val hdfsPath = new Path(path)if (hdfs.exists(hdfsPath)) {//val filePermission = new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.READ)hdfs.delete(hdfsPath, true)}}}
Caused by: java.io.IOException: Added a key not lexically larger than previous. Current cell = 1/hfile-fy:age/1588230543677/Put/vlen=2/seqid=0, lastCell = 1/hfile-fy:name/1588230543677/Put/vlen=4/seqid=0
2. 批量put
2.1数据样例
val rowKeyField = "id"val df = sparkSession.read.format("json").load("/stats.json")val fields = df.columns.filterNot(_ == "id")df.rdd.foreachPartition { partition =>val hbaseConf = HBaseConfiguration.create()hbaseConf.set("hbase.zookeeper.quorum", "linux-1:2181,linux-2:2181,linux-3:2181")hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, "batch_put")val conn = ConnectionFactory.createConnection(hbaseConf)val table = conn.getTable(TableName.valueOf("batch_put"))val res = partition.map { row =>val rowKey = Bytes.toBytes(row.getAs(rowKeyField).toString)val put = new Put(rowKey)val family = Bytes.toBytes("hfile-fy")fields.foreach { field =>put.addColumn(family, Bytes.toBytes(field), Bytes.toBytes(row.getAs(field).toString))}put}.toListTry(table.put(res)).getOrElse(table.close())table.close()conn.close()}
val hbaseConf = sparkSession.sessionState.newHadoopConf()hbaseConf.set("hbase.zookeeper.quorum", "linux-1:2181,linux-2:2181,linux-3:2181")hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, "direct")val job = Job.getInstance(hbaseConf)job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])job.setMapOutputValueClass(classOf[Result])job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])val rowKeyField = "id"val df = sparkSession.read.format("json").load("/stats.json")val fields = df.columns.filterNot(_ == "id")df.rdd.map { row =>val put = new Put(Bytes.toBytes(row.getAs(rowKeyField).toString))val family = Bytes.toBytes("hfile-fy")fields.foreach { field =>put.addColumn(family, Bytes.toBytes(field), Bytes.toBytes(row.getAs(field).toString))}(new ImmutableBytesWritable(), put)}.saveAsNewAPIHadoopDataset(job.getConfiguration)
以上主要介绍了3种利用Spark将数据导入HBase的方式。其中,通过生成HFile文件,然后以BulkLoad导入的方式更适合于大数据量的操作。
