hbase批量写入优化
现在hbase的使用已经很普遍了,但是我发现有的同学还是用的不到位,所以这次就主要来介绍下hbase批量写入的优化!
优化点
建表需要预分区
连接关闭自动提交
连接添加缓存
写入关闭WAL
因为hbase最近几年更新换代太频繁,本次主要针对2.0以上和2.0以下两个版本讲述
2.0以下的版本
maven
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.2.6</version>
</dependency>
hbaes连接
package cn.com.liu.hbase.conn;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import java.io.IOException;
/**
* @Author liu
*/
public class HbaseUtils {
private static final Log logger = LogFactory.getLog(HbaseUtils.class);
private static HTable ht = null;
public static HTable getInstance(String zk, String port, String parent, String table){
if(null == ht){
try {
ht = Init(zk,port,parent,table);
} catch (IOException e) {
logger.warn("hbase init failed {}",e);
}
}
while(true){
try{
ht.getName();
break;
}catch (Exception e){
try {
Thread.sleep(1000 * 3);
ht = Init(zk,port,parent,table);
} catch (Exception e1) {
logger.warn("hbase init failed {}",e);
}
}
}
return ht;
}
private static HTable Init(String zk, String port, String parent, String table) throws IOException {
Configuration HBASE_CONFIG = new Configuration();
HBASE_CONFIG.set("hbase.zookeeper.quorum", zk);
HBASE_CONFIG.set("hbase.zookeeper.property.clientPort", parent);
HBASE_CONFIG.set("zookeeper.znode.parent", port);
HBASE_CONFIG.setLong("zookeeper.session.timeout", 900000);
HBASE_CONFIG.setInt("hbase.client.retries.number", 5);
HBASE_CONFIG.setInt("hbase.meta.scanner.caching", 5000);
HBASE_CONFIG.setInt("hbase.client.prefetch.limit", 100);
HBASE_CONFIG.setInt("hbase.rpc.timeout", 600000);
Configuration configuration = HBaseConfiguration.create(HBASE_CONFIG);
HTable ht = new HTable(configuration, table);
ht.setWriteBufferSize(8 * 1024 * 1024); // 设置缓存
ht.setAutoFlush(false, false); // 关闭自动提交
return ht;
}
}
数据插入代码
package cn.com.liu.hbase.put;
import cn.com.liu.hbase.conn.HbaseUtils;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.*;
import java.util.ArrayList;
import java.util.List;
import java.util.zip.GZIPInputStream;
/**
* @Author liu
*/
public class HbasePutData {
private final byte[] FAMILYNAME = Bytes.toBytes("f");
private final byte[] QUALIFIER = Bytes.toBytes("q");
private void readfile(String file_name, String zk, String port, String parent, String table) throws IOException {
File file = new File(file_name);
FileInputStream fis = new FileInputStream(file);
GZIPInputStream gzis = new GZIPInputStream(fis, 8 * 1024);
InputStreamReader isr = new InputStreamReader(gzis);
BufferedReader br = new BufferedReader(isr);
HTable ht = HbaseUtils.getInstance(zk,port,parent,table);
List<Put> puts = new ArrayList<Put>();
String line = null;
while ((line = br.readLine()) != null) {
String[] part = line.split(",", -1);
String udid = part[0];
String permanent_id = part[1];
byte[] key_hash = Bytes.toBytes((short) (permanent_id.hashCode() & 0x7fff));
byte[] key = Bytes.add(key_hash, Bytes.toBytes(permanent_id));
Put put = new Put(key);
put.addColumn(FAMILYNAME, QUALIFIER, Bytes.toBytes(udid));
put.setWriteToWAL(false); // 关闭日志
puts.add(put);
if (puts.size() >= 2000) {
ht.put(puts);
ht.flushCommits();
puts.clear();
}
}
if(!puts.isEmpty()){
ht.put(puts);
ht.flushCommits();
puts.clear();
}
if(null != ht) ht.close();
if (null !=br) br.close();
if (null !=isr) isr.close();
if (null !=fis) fis.close();
if (null !=gzis) gzis.close();
}
}
2.0以上的版本
此处需要知道老版本中的 HTable.setAutoFlush(false) 已经取消,可以使用BufferedMutator替换
maven
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.2.4</version>
</dependency>
BufferedMutator创建
HBase的客户端API提供了写缓存区,put的数据一开始放在缓存区内,当数量到达指定的容量或者用户强制提交是才将数据一次性提交到HBase的服务器。这个缓冲区可以通过调用 HTable.setAutoFlush(false) 来开启。而新版HBbase的API中使用了BufferedMutator替换了老版的缓冲区,通过BufferedMutator对象提交的数据自动存放在缓冲区中。
package com.liu.hbase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.BufferedMutatorParams;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
/**
* @Author liu
*/
public class HbaseBufferedMutatorSink {
private static Logger logger = LoggerFactory.getLogger(HbaseBufferedMutatorSink.class);
private static BufferedMutator ht = null;
public static BufferedMutator getInstance(String zk,String port, String parent, String tablename){
if(null == ht){
try {
ht = intiHbase(zk,port,parent,tablename);
} catch (IOException e) {
logger.warn("hbase init failed {}",e);
}
}
while(true){
try{
ht.getName();
break;
}catch (Exception e){
try {
Thread.sleep(1000 * 3);
ht = intiHbase(zk,port,parent,tablename);
} catch (Exception e1) {
logger.warn("hbase init failed {}",e);
}
}
}
return ht;
}
public static BufferedMutator intiHbase(String zk,String port, String parent, String tablename) throws IOException {
Configuration HBASE_CONFIG = new Configuration();
HBASE_CONFIG.set("hbase.zookeeper.quorum",zk);
HBASE_CONFIG.set("hbase.zookeeper.property.clientPort",port);
HBASE_CONFIG.set("zookeeper.znode.parent",parent);
HBASE_CONFIG.setLong("zookeeper.session.timeout", 900000);
HBASE_CONFIG.setInt("hbase.client.retries.number", 5);
HBASE_CONFIG.setInt("hbase.meta.scanner.caching", 5000);
HBASE_CONFIG.setInt("hbase.client.prefetch.limit", 100);
HBASE_CONFIG.setInt("hbase.rpc.timeout", 600000);
HBASE_CONFIG.set("hbase.client.keyvalue.maxsize","524288000");//设置key和value的最大值为500m
Configuration configuration = HBaseConfiguration.create(HBASE_CONFIG);
Connection conn = ConnectionFactory.createConnection(configuration);
BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf(tablename));
params.writeBufferSize(4 * 1024 * 1024); // 设置缓存
BufferedMutator mutator = conn.getBufferedMutator(params);
return mutator;
}
}
数据写入方法
package com.liu.hbase;
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.*;
import java.util.ArrayList;
import java.util.List;
import java.util.zip.GZIPInputStream;
/**
* @Author liu
*/
public class HbasePutData {
private final byte[] FAMILYNAME = Bytes.toBytes("f");
private final byte[] QUALIFIER = Bytes.toBytes("q");
private void readfile(String file_name, String zk, String port, String parent, String table) throws IOException {
File file = new File(file_name);
FileInputStream fis = new FileInputStream(file);
GZIPInputStream gzis = new GZIPInputStream(fis, 8 * 1024);
InputStreamReader isr = new InputStreamReader(gzis);
BufferedReader br = new BufferedReader(isr);
BufferedMutator ht = HbaseBufferedMutatorSink.getInstance(zk, port, parent, table);
List<Put> puts = new ArrayList<Put>();
String line = null;
while ((line = br.readLine()) != null) {
String[] part = line.split(",", -1);
String udid = part[0];
String permanent_id = part[1];
byte[] key_hash = Bytes.toBytes((short) (permanent_id.hashCode() & 0x7fff));
byte[] key = Bytes.add(key_hash, Bytes.toBytes(permanent_id));
Put put = new Put(key);
put.addColumn(FAMILYNAME, QUALIFIER, Bytes.toBytes(udid));
put.setDurability(Durability.SKIP_WAL);//关闭日志
puts.add(put);
if (puts.size() >= 2000) {
ht.mutate(puts);
ht.flush();
puts.clear();
}
}
if(!puts.isEmpty()){
ht.mutate(puts);
ht.flush();
puts.clear();
}
if(null != ht) ht.close();
if (null !=br) br.close();
if (null !=isr) isr.close();
if (null !=fis) fis.close();
if (null !=gzis) gzis.close();
}
}
hbase建表(预分区)
package com.liu.hbase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
/**
* @Author liu
*/
public class ceateTable_by {
private static Configuration CONF = HBaseConfiguration.create();
private static final byte[] FAMILYNAME = Bytes.toBytes("f");
private static final HColumnDescriptor hd = new HColumnDescriptor(FAMILYNAME);
private static Admin admin = null;
private static HTableDescriptor desc = null;
public static void main(String[] args){
String ZooKeeper ="ip1,1p2,1p3";
String Parent = "/hbase";
String RegionNum = "10";
String DateStr = "";
String TableName= "hbase_table";
hd.setMaxVersions(1);
hd.setBlocksize(256 * 1024);
// hd.setTimeToLive(Integer.MAX_VALUE);
hd.setTimeToLive(2 * 24 * 60 * 60); // 两天生存周期
hd.setBloomFilterType(BloomType.NONE);
hd.setCompressionType(Algorithm.LZ4);
long start = System.currentTimeMillis();
CONF.set("hbase.zookeeper.quorum", ZooKeeper);
// hd.setCompressionType(Algorithm.NONE);
for (String zkparent : Parent.split(",", -1)) {
CONF.set("zookeeper.znode.parent", zkparent);
try {
Connection conn = ConnectionFactory.createConnection(CONF);
admin = conn.getAdmin();
} catch (MasterNotRunningException e) {
e.printStackTrace();
} catch (ZooKeeperConnectionException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
//create table
createRegions(Integer.parseInt(RegionNum),DateStr,TableName);
try {
admin.close();
} catch (IOException e) {
e.printStackTrace();
}
}
System.out.println("Total time consumed : "+ (System.currentTimeMillis() - start) / 1000F + "s");
}
private static void createRegions(int regionNum, String date, String name){
desc = new HTableDescriptor(TableName.valueOf(name + date));
desc.addFamily(hd);
try {
//均分region
admin.createTable(desc, getSplitKey(regionNum));
} catch (IOException e) {
e.printStackTrace();
}
System.out.println("Create table " + desc.getTableName() + " on "
+ CONF.get("zookeeper.znode.parent"));
}
// 用来均分region
private static byte[][] getSplitKey(int rnum) {
short d;
short j = 0;
short splitKey = 0;
byte[][] barr = new byte[rnum - 1][];
d = (short)(0x7FFF / rnum);
short remain =(short)( 0x7FFF - rnum * d);
int startadd = (rnum - remain) / 2;
for (short i = 0; i < rnum - 1; i++) {
if (i >= startadd && j++ <= remain) {
splitKey += d;
splitKey++;
barr[i] = Bytes.toBytes((short)splitKey);
} else {
splitKey += d;
barr[i] = Bytes.toBytes((short)splitKey);
}
}
return barr;
}
}
上面预分10个分区
就介绍到这里吧,有兴趣的同学可以使用下,经过关闭自动提交、设置缓存和关闭WAL日志后,写入的效率绝对大大的提升,数据量越大越明显!
其实这个已经是老戏长谈了,只不过现在升级了版本以后好多同学不会用了,找不到对应的方法而已。