vlambda博客
学习文章列表

深入Hbase原理(超详细)

目录

前言

Hbase概述

Hbase中的核心概念

原理加强之Region拆分

HMaster的作用   主节点

Region的拆分

负载均衡

1.1 自动负载均衡流程

1.2 强制执行负载均衡

1.3 人为的移动

大小合并

大合并之Region合并

小合并之Hfile的合并

Hbase的数据导入

shell端脚本方式

使用步骤

Java程序方式

MR程序读取数据处理 并输出到Hbase中

前言

Hbase概述

我们都知道,Hbase是一个高可靠的、易扩展的、面向列式存储的分布式数据库系统,那既然是一款数据库管理系统,当然,它的数据借助于HDFS存储。具体路径为/hbase/data/default/table_name/region_name/cf_name/hfile

Hbase中的核心概念

region

1)表的行数据范围,将一张大的表划分成多个region,将region分配给不同的regionserver机器管理-------->构成分布式数据库

2)region中有

store:一个列族对应一个store

memorystore  :读写数据的内存  写数据(内存)---->对整个file中的数据进行排序

WALG:记录用户的操作行为   在进行写入数据时,若机器意外宕机时恢复用户数据的一种方式

storefile:内存对象flush到hdfs中形成hfile文件;storefile就是hfile的抽象对象

blockCache:提升查询的效率

其他名词概念

namespace:名称空间,类似于mysql中的数据库名

表   namespace:table_name

列族  columanFamily简称cf:对列的分类管理  创建表时要注意   1)列族不要太多(太多意味着更多的memorystore内存对象,占用更多的内存)  2)命令不要太长(储存的时候数据过长 冗余)

行键 rowkey:1)行的唯一标识   2)索引  3)一维排序  4)布隆过滤器

属性 qualifier  :具有稀疏性

值 value:存储的字节数据

原理加强之Region拆分

HMaster的作用   主节点

1)DDL有关的操作,比如建表需要master主节点  ;而读写数据却不需要master,从hbase读写数据的流程来看,确实没有接触到master

2)region具体分配到哪个regionserver上需要matser

Region的拆分

为什么要拆分region?

HBase是以表的形式存储数据的。一张表被划分为多个regions,regions分布在多个Region Server上,单个region只能分布在一个Region Server节点上,不能跨Region Server存放。

Region的两个重要属性:StartKey和EndKey,分别表示这个region所维护的rowkey范围。当做读/写请求时,寻址到该数据的rowkey落在某个start-end key范围内,那么就会定位到该范围内的region所在的Region Server上进行数据读/写。

随着数据的增多,region所要管理的数据也会越来越多,查询时,出现高并发热点问题的概率大大增加。单个机器需要来处理很多数据,并且当这个regionserver的负载过重。

这个时候就需要对region进行拆分,均衡到不同的节点上进行管理!

热点问题

在创建表的时候建议 使用预分region表 (指定了切割点  ,对数据有分组规划)

1. 观察数据特点   注意重点字段(业务)

2 了解具体的表对应的业务,重点字段

3 根据业务 判断数据的读多(查询)  写多(插入)

4 读数据 写数据 (特点)  维度

5 避免热点问题  [插入热点 , 查询热点]

  • 138...大量的数据插入到同一个region 一个regionserver在服务  压力过大

  • 并发的查询138...数据  一个regionserver在服务 接收所有的并发查询  

怎么拆分?

1)自动拆分

1.1默认按照大小

计算公式为:
Min{

1^3*2*128M    256M

2^3*2*128M    2G

3^3*2*128M    6.75G

 10G          10G

当hbase表在regionserver上的region,如果region的大小到达一个阈值,这个region将会分为两个。

如果默认值情况下,一个表在一个regionserver上split的阈值是:
256MB(第一次split),2GB(第二次),6.75GB(第三次),10GB(第四次),10GB... 10GB

1.2keyPrefixRegionSplitPolicy(自定义) key的前缀

这种拆分是在原来的拆分基础上 ,增加了拆分点(splitPoint,拆分点就是Region被拆分时候的rowkey)的定义,保证有相同前缀的rowkey不会被拆分到不同的Region上

参数是 keyPrefixRegionSplitPolicy.prefix_length   rowkey:前缀长度

问题是 : 可能相同主题的数据会被分到不同的regionserver中 ,查询数据来源于不同的regionserver

1.3DelimitedKeyPrefixRegionSplitPolicy key的分隔符

和上一种查分策略一致 , 上一种是按照key的固定长度拆分的 , 这种按照的是分割符

DelimitedKeyPrefixRegionSplitPolicy.delimiter参数分割符

指定一个分隔符 能保证相同的主题数据在一个regionserver中

2)手动拆分(很少用,不需要)

Examples: split 'tableName' split 'namespace:tableName' split 'regionName' # format: 'tableName,startKey,id' split 'tableName', 'splitKey' split 'regionName', 'splitKey'

3)预分region

shell端:

 hbase> create 'ns1:t1', 'f1', SPLITS => ['10', '20', '30', '40'] hbase> create 't1', 'f1', SPLITS => ['10', '20', '30', '40']

java客户端:

 byte[][] keys = new byte[][]{"e".getBytes(), "m".getBytes(), "t".getBytes()}; // 建表 指定预分region的key(数组) admin.createTable(tableDescriptor, keys);

一般在实际业务中,首先会根据数据量和集群的规模还有业务特点,会进行预分区region,之后按照默认的拆分规则自动拆分即可

可以设置拆分的策略  1)大小拆分  2)按照前缀拆分 3)按照分隔符拆分  4)手动拆分(几乎很少用)

region在底层是文件夹。

拆分之后

region拆分之后,面临的将是负载均衡或(region的移动) 

region的移动会非常非常占用资源;因为region的移动会牵扯到文件、文件夹的创建、hfile的移动。

会消耗大量的IO、网络、CPU资源被占用,最好不要手动频繁的去拆分。

拆分带来的好处:将一个大的表拆分成多个region区域,交由不同的regionserver管理,形成分布式数据库,减少了某一个机器的的负载压力。

负载均衡

由Master的LoadBalancer线程周期性的在各个RegionServer间移动region维护负载均衡。

1 经常被并发查询的数据不要存储在同一个RegionServer中 , 避免热点读取问题 .

2 当一个机器上经过大量的插入或者删除数据以后 ,region合并或者分裂 ,那么机器上的region的数量会相差很大 .

3 当新增了节点以后 , 应该去分配一些其他机器上的region数据

4 当某个RegionServer宕机以后 , 这台机器上数据的分配

region的执行

Hbase中负载均衡的开关balanceSwitch默认是开启的 .

hbase.balancer.period是设定负载均衡的参数,默认为每5分钟(300000毫秒)运行一次,参数配置通过hbase-site.xml文件。具体的设置格式如下:


   
     
     
   
  1. <property>

  2. <name>hbase.balancer.period</name>

  3. <value>300000</value>

  4. </property>

1.1 自动负载均衡流程

1)两个有效参数:MIN = floor(average)(表示下限)和MAX=ceil(average)(表示上限)。

2)循环过载机器,将Region卸载到MAX数量,在小于等于MAX时停止排序Region(按时间新旧)。

3)遍历最轻负载机器,分配Region直到Server达到MIN值。在大于等于MIN时停止。这些Region都是之前卸载的。可能没有足够地卸载Region让轻负载的机器达到MIN值,如果这样,在Region数等于neededRegions(轻负载机器的数量)时停止。可能我们分配了卸载的Region到轻负载机器,但是仍然有Region没有分配出去,这种情况下,本步骤完成,在下面步骤中再做处理。

4)如果neededRegions是非零的值,遍历负载最重的机器,从每台机器上卸载一个Region,使得它们的值从MAX到MIN。

5)现在有很多Region等待分配,遍历最轻的负载机器(多台),分配Region到MIN。

6)如果仍然有Region没有分配,遍历最轻的负载机器(多台),这次分配Region到MAX。

7)所有Server的Region数量时MAX或者MIN,另外,所有大于等于MAX的Server保证在均衡完成后都是MAX个Region。从而保证Region移动的最小数量。

其中,轻负载指的是Region数量小于等于AVG,过载指大于等于AVG,所有的RegionServer都按照负载从大到小排序,存放在TreeMap中(保证先遍历过载Server)。

1.2 强制执行负载均衡

1)kill 掉不是Hmaster节点的 regionserver(slave2)

2) hbase 16010 查看regionserver在slave2节点

3) 在master节点启动 regionserver(slave2)

    master $>hbase-daemons.sh start regionserver

4) hbase 16010 查看regionserver负载不均衡 (多刷新  0  6)


  
    
    
  
  1.     balance负载均衡指令,需使用开关模式,原因负载均衡比较耗费资源:

  2.     hbase(main):005:0> balance_switch true    //开启负载均衡

  3.     hbase(main):005:0> balancer                      //执行负载均衡

  4.     hbase(main):005:0> balance_switch false   //关闭负载均衡

  5.     hbase(main):005:0> balancer "force"           //强制负载均衡

1.3 人为的移动

当发生经常性的查询热点问题时, 您的请求总是请求到一个服务节点上 ,我们可以把这个机器中的数据理解成热数据 ,可以人为的拆分以后手动的移动到不同的机器上 实现查询热点的负载均衡 

使用命令


  
    
    
  
  1. 1 切分

  2. Examples:

  3. split 'tableName'

  4. split 'namespace:tableName'

  5. split 'regionName' # format: 'tableName,startKey,id'

  6. split 'tableName', 'splitKey'

  7. split 'regionName', 'splitKey'

  8. 页面查看region情况 ,或者使用list_regions "tb_name"

  9. 2 移动 将某个region移动到某个机器上

  10. move  "a84afc7eb10797188c0af1534e143eb9" , "linux03,16020,1591844369905"

 

大小合并

大合并之Region合并

为什么需要合并region呢?

当一个Region被不断的写数据,达到Region的Split的阀值时(由属性hbase.hregion.max.filesize来决定,默认是10GB),该Region就会被Split成2个新的Region。随着业务数据量的不断增加,Region不断的执行Split,那么Region的个数也会越来越多

一个表的Region过大,势必整个集群的Region个数也会增加,负载均衡后,每个RegionServer承担的Region个数也会增加

当表中的数据被大量删除以后,表的行数急剧减少,region的个数没有变。region的个数没有变,每个region管理的数据行数少!合并region

当然合并region也是占用大量的IO、网络、cpu资源的。

小合并之Hfile的合并

hbase是分布式列式存储数据库 , 其具有的基本功能就是对数据的增删改查 , 那么Hbase的数据是存储在HDFS上的,我们知道在HDFS中的数据是不允许随机修改和删除的!这与HDFS的功能相违背!

hbase数据底层存储下HDFS系统中--------Hfile的形式存储的!在HDFS中Hfile的具体位置 /hbase/data/default/table_name/region_name/cf_name/hfile_name   如下图所示

hfile合并

数据加载到memstore,数据越来越多直到memstore占满,再写入硬盘storefile中,每次写入形成一个单独Hfile,当Hfile达到一定的数量后,就会开始把小Hfile合并成大Hfile,因为Hadoop不擅长处理小文件,文件越大性能越好。

hfile合并的时机

1)有大量的更新数据  (修改、删除 操作)

hfile在进行修改数据或者删除数据时,并不是真的删掉了这条数据,而是重新生成一个hfile,将数据放入其中,并打上标记。比如删除rk001 的数据,会

新生成file文件,里面存有rk001 并带有墓碑标记的数据   客户端在拿数据时,服务器找到了两条rk001 一看有墓碑标记,就不再将此条数据读出返回给客户端

2)有大量的小文件

设计的不好,rowkey多,大量的冗余数据,列族多,store多,memorystore多,占用内存过多,容易溢出,会使机器的内存占用达到阈值,就会flush掉所有的memorystore中的数据,从而产生大量的

storefile进而产生很多很多的hfile(列族多,内存小)

3)有TTL过期数据

数据大量删除------>hfile新生-------->hifle合并-------->region管理数据行数变少-------->region合并

region的合并不要在业务高峰进行

面试题:

hbase表中的数据存储在哪?

hbase中的数据物理存储在hdfs上。

hdfs中的数据不是不允许随机修改和删除吗,你刚刚说hbase上的数据存储在hdfs上的,hbase既然是个数据库管理系统,那肯定有crud的功能吧,那这怎么解释呢?

hdfs上的数据的确是不允许进行随机修改和删除的,而habse中的数据修改和删除也不是直接就修改和删掉了的,是通过数据的标记和文件的合并来实现修改和删除的

比如,hbase数据的删除,对数据的删除,实际上生成了一个新的hfile,并给数据打上墓碑标记

查询数据时,看到墓碑标记,就不会读取这条数据并返回

等到一定条件,hfile合并之后,数据才会消失。

深入Hbase原理(超详细)

Hbase的数据导入

1)put  不好     直接将数据导入到Hbase中 , 底层使用RPC请求 , 每次put都会RPC请求,效率并不高

一行插入一次  进行一次RPC请求

 * ------------------>hbase
 * ------------------>hbase
 * ------------------>hbase
 * ------------------>hbase

用rpc请求,一条一条地通过rpc提交给regionserver去插入,效率极其低下!

Table tb_dml = conn.getTable(TableName.valueOf("tb_dml")); Put rk0011 = new Put(Bytes.toBytes("rk0011"));rk0011.addColumn("cf1".getBytes(), "name".getBytes(), "guanyu".getBytes());rk0011.addColumn("cf1".getBytes(), "gender".getBytes(), "M".getBytes()); Put rk0010 = new Put(Bytes.toBytes("rk0010"));rk0010.addColumn("cf1".getBytes(), "age".getBytes(), Bytes.toBytes(23));rk0010.addColumn("cf1".getBytes(), "name".getBytes(), "guanyu".getBytes());// 插入多行数据ArrayList<Put> puts = new ArrayList<>();puts.add(rk0011); puts.add(rk0010);// 插入数据tb_dml.put(puts);

2)BufferedMutator

使用缓存批次的形式发送RPC请求 , 减少RPC的请求次数 , 插入数据的效率会比put方式效率高, 但是对于大量的静态数据也并不适合!

private static void mutationInsert() throws IOException { Connection conn = HbaseUtils.getHbaseConnection(); BufferedMutator mutator = conn.getBufferedMutator(TableName.valueOf("tb_user")); Put put = new Put(Bytes.toBytes("rk1005")); // 行键 put.addColumn(Bytes.toBytes("cf1"),// 列族 Bytes.toBytes("name"),//属性 Bytes.toBytes("OOO") //值 ); put.addColumn("cf1".getBytes(), "gender".getBytes(), "M".getBytes()); // 插入数据 mutator.mutate(put); // 将缓存在本地的数据 请求Hbase插入 mutator.flush();}

3) BulkLoad

数据插入到hbase中的本质:将数据以Hfile文件格式存储在HDFS的指定位置 

*** 将大量的静态数据转换成hfile文件  直接存储到指定的hdfs路径下  批量导入***

 

使用工具:bulkloader 一个用于批最快速导入数据到 hbase 的工具/方法

应用场景用于已经存在一批巨量的静态数据的情况!                   

简介:使用 Bulk Load方式由于利用了HBase 的数据信息是按照特定格式存储在HDFS里的这一特性,直接在 HDFS中生成持久化的 HFile数据格式文件,然后完成巨量数据快速入库的操作,配合MapReduce完成这样的操作,不占用 Region资源,不会产生巨量的写入IO,所以需要较少的CPU和网络资源。Bulk Load 的实现原理是通过一个MapReduce Job来实现的,通过Job直接生成一个HBase的 HFile格式文件,用来形成一个特殊的 HBase数据表,然后直接将数据文件加载到运行的集群中。与使用Hbase API相比,使用Bulkload导入数据占用更少的CPU和网络资源

3.1shell端脚本方式

使用shell脚本,可以完成。该方法操作方便,但不够灵活。

使用步骤

静态数据如下:

uid001,zss,23,F
uid002,lss,13,M
uid003,ww,22,M
uid004,zl,34,F
uid005,tq,43,M
uid006,wb,55,F
uid007,sj,98,M

1先在linux01上新建txt,并上传到hdfs上

深入Hbase原理(超详细)

深入Hbase原理(超详细)

2在hbase创建一张表   create  ‘test_data’,'cf1'

深入Hbase原理(超详细)

3执行命令   根据数据生成hbase文件


   
     
     
   
  1. #info:列族名 name:属性 imp:表名 /tsv/input:输入路径 /tsv/output:输出路径

  2. hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dmporttsv.separator=, -Dimporttsv.columns='HBASE_ROW_KEY,cf1:name,cf1:age,cf1:gender' -Dimporttsv.bulk.output=/tsv/output imp /tsv/input

命令解释:  -Dimporttsv.separator=,     指定文件分割符号  为  ,

Dimporttsv.columns='HBASE_ROW_KEY,cf1:name,cf1:age,cf1:gender   分割的行键    列族:属性

/tsv/input:输入路径   /tsv/output:输出路径

 hbase  org.apache.hadoop.hbase.mapreduce.ImportTsv  -Dimporttsv.separator=, -Dimporttsv.columns='HBASE_ROW_KEY,cf1:name,cf1:age,cf1:gender' -Dimporttsv.bulk.output=/tsv/output  test_data  /data深入Hbase原理(超详细)

深入Hbase原理(超详细)

4查看页面上文件夹显示

深入Hbase原理(超详细)

此时是还没有hfile的

5将生成的hfile文件导入到hbase表中

hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles /tsv/output test_data

深入Hbase原理(超详细)

执行后,查看页面文件

深入Hbase原理(超详细)

6解析验证是否导入的静态数据文件

hbase hfile   -p -f     文件在hdfs中的路径     或者使用scan 'test_data'

3.2Java程序方式

使用Mr程序的自定义输入和输出文件类型 

  • 输入Hfile文件     输出普通的文件   表-->普通文件(数据导出)

  • 输入Hfile文件    输出Hfile文件   表-->表

  • 输入普通文件   输出Hflie  将普通数据导入到Hbase中

*** 将普通的数据 (大量)  ---->   hfile格式的数据 [MR]

[MR]:自定义输入和输出[hfile]  --> 指定路径 

编写java程序  mapper读数据  处理  reducer 写数据     灵活,但需要另写代码

MR程序读取数据处理 并输出到Hbase中MovieBean代码:package cn._51doit.movie;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;
/***/public class MovieBean implements Writable {private String movie ;private double rate ;private String timeStamp ;private String uid ;
public String getMovie() { return movie;}
public void setMovie(String movie) { this.movie = movie;}
public double getRate() { return rate;}
public void setRate(double rate) { this.rate = rate;}
public String getTimeStamp() { return timeStamp;}
public void setTimeStamp(String timeStamp) { this.timeStamp = timeStamp;}
public String getUid() { return uid;}
public void setUid(String uid) { this.uid = uid;}
@Overridepublic void write(DataOutput dataOutput) throws IOException { dataOutput.writeUTF(movie); dataOutput.writeDouble(rate); dataOutput.writeUTF(timeStamp); dataOutput.writeUTF(uid);}
@Overridepublic void readFields(DataInput dataInput) throws IOException { this.movie = dataInput.readUTF(); this.rate = dataInput.readDouble() ; this.timeStamp = dataInput.readUTF(); this.uid = dataInput.readUTF();
}}

MR程序:

package cn._51doit.mr;
import cn._51doit.movie.MovieBean;import com.google.gson.Gson;import com.google.gson.JsonSyntaxException;import org.apache.commons.lang3.StringUtils;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;import org.apache.hadoop.hbase.mapreduce.TableReducer;import org.apache.hadoop.hbase.util.Bytes;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import java.io.IOException;
/***/public class LoadData2HbaseTable {
/** * 处理每行数据 生成rowkey和movieBean */static class LoadData2HbaseTableMapper extends Mapper<LongWritable, Text, Text, MovieBean> { Gson gs = new Gson(); Text k = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { try { String line = value.toString(); MovieBean mb = gs.fromJson(line, MovieBean.class); String s = StringUtils.leftPad(mb.getMovie(), 5, '0'); // 设计rowkey主键 String rk = s + "_" + mb.getTimeStamp(); k.set(rk); context.write(k, mb); } catch (Exception e) { e.printStackTrace(); }
}
}
static class LoadData2HbaseTableReducer extends TableReducer<Text , MovieBean , ImmutableBytesWritable> {
@Overrideprotected void reduce(Text key, Iterable<MovieBean> values, Context context) throws IOException, InterruptedException { String rk = key.toString(); Put put = new Put(rk.getBytes()); // 行 MovieBean mb = values.iterator().next(); String movie = mb.getMovie(); double rate = mb.getRate(); String timeStamp = mb.getTimeStamp(); String uid = mb.getUid(); put.addColumn("cf".getBytes() , "movie".getBytes() , movie.getBytes()) ; put.addColumn("cf".getBytes() , "rate".getBytes() , Bytes.toBytes(rate)) ; put.addColumn("cf".getBytes() , "timeStamp".getBytes() , timeStamp.getBytes()) ; put.addColumn("cf".getBytes() , "uid".getBytes() , uid.getBytes()) ; context.write(null , put);}

}
public static void main(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum","linux01:2181,linux02:2181,linux03:2181"); Job job = Job.getInstance(conf, "tohbase");
job.setMapperClass(LoadData2HbaseTableMapper.class);
job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(MovieBean.class);
FileInputFormat.setInputPaths(job , new Path("D:\\data\\movie\\input"));TableMapReduceUtil.initTableReducerJob("movie2",LoadData2HbaseTableReducer.class,job);
job.waitForCompletion(true) ;

}
}

要点整理:

rowkey的设计

rowkey决定了你的查询效率

1)观察数据结构 数据特点  了解核心字段

2)了解业务需求 :知道数据的作用,经常查询   经常插入数据

3)经常查询   ---------》确定查询纬度    

          1    movie

           2    uid

           3   movie+uid

4)注意点:

1 将查询纬度放在rk上;

2 保证rowkey的唯一性;

3 rowkey可以长度相同,方便排序

4 划分属性含义  用特殊符号 _

5 rowkey不要太长,容易造成大量冗余数据

5)要考虑并发热点问题