hbase批量写入优化
现在hbase的使用已经很普遍了,但是我发现有的同学还是用的不到位,所以这次就主要来介绍下hbase批量写入的优化!
优化点
建表需要预分区
连接关闭自动提交
连接添加缓存
写入关闭WAL
因为hbase最近几年更新换代太频繁,本次主要针对2.0以上和2.0以下两个版本讲述
2.0以下的版本
maven
<dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>1.2.6</version></dependency>
hbaes连接
package cn.com.liu.hbase.conn;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.client.HTable;import java.io.IOException;/*** @Author liu*/public class HbaseUtils {private static final Log logger = LogFactory.getLog(HbaseUtils.class);private static HTable ht = null;public static HTable getInstance(String zk, String port, String parent, String table){if(null == ht){try {ht = Init(zk,port,parent,table);} catch (IOException e) {logger.warn("hbase init failed {}",e);}}while(true){try{ht.getName();break;}catch (Exception e){try {Thread.sleep(1000 * 3);ht = Init(zk,port,parent,table);} catch (Exception e1) {logger.warn("hbase init failed {}",e);}}}return ht;}private static HTable Init(String zk, String port, String parent, String table) throws IOException {Configuration HBASE_CONFIG = new Configuration();HBASE_CONFIG.set("hbase.zookeeper.quorum", zk);HBASE_CONFIG.set("hbase.zookeeper.property.clientPort", parent);HBASE_CONFIG.set("zookeeper.znode.parent", port);HBASE_CONFIG.setLong("zookeeper.session.timeout", 900000);HBASE_CONFIG.setInt("hbase.client.retries.number", 5);HBASE_CONFIG.setInt("hbase.meta.scanner.caching", 5000);HBASE_CONFIG.setInt("hbase.client.prefetch.limit", 100);HBASE_CONFIG.setInt("hbase.rpc.timeout", 600000);Configuration configuration = HBaseConfiguration.create(HBASE_CONFIG);HTable ht = new HTable(configuration, table);ht.setWriteBufferSize(8 * 1024 * 1024); // 设置缓存ht.setAutoFlush(false, false); // 关闭自动提交return ht;}}
数据插入代码
package cn.com.liu.hbase.put;import cn.com.liu.hbase.conn.HbaseUtils;import org.apache.hadoop.hbase.client.Durability;import org.apache.hadoop.hbase.client.HTable;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.util.Bytes;import java.io.*;import java.util.ArrayList;import java.util.List;import java.util.zip.GZIPInputStream;/*** @Author liu*/public class HbasePutData {private final byte[] FAMILYNAME = Bytes.toBytes("f");private final byte[] QUALIFIER = Bytes.toBytes("q");private void readfile(String file_name, String zk, String port, String parent, String table) throws IOException {File file = new File(file_name);FileInputStream fis = new FileInputStream(file);GZIPInputStream gzis = new GZIPInputStream(fis, 8 * 1024);InputStreamReader isr = new InputStreamReader(gzis);BufferedReader br = new BufferedReader(isr);HTable ht = HbaseUtils.getInstance(zk,port,parent,table);List<Put> puts = new ArrayList<Put>();String line = null;while ((line = br.readLine()) != null) {String[] part = line.split(",", -1);String udid = part[0];String permanent_id = part[1];byte[] key_hash = Bytes.toBytes((short) (permanent_id.hashCode() & 0x7fff));byte[] key = Bytes.add(key_hash, Bytes.toBytes(permanent_id));Put put = new Put(key);put.addColumn(FAMILYNAME, QUALIFIER, Bytes.toBytes(udid));put.setWriteToWAL(false); // 关闭日志puts.add(put);if (puts.size() >= 2000) {ht.put(puts);ht.flushCommits();puts.clear();}}if(!puts.isEmpty()){ht.put(puts);ht.flushCommits();puts.clear();}if(null != ht) ht.close();if (null !=br) br.close();if (null !=isr) isr.close();if (null !=fis) fis.close();if (null !=gzis) gzis.close();}}
2.0以上的版本
此处需要知道老版本中的 HTable.setAutoFlush(false) 已经取消,可以使用BufferedMutator替换
maven
<dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>2.2.4</version></dependency>
BufferedMutator创建
HBase的客户端API提供了写缓存区,put的数据一开始放在缓存区内,当数量到达指定的容量或者用户强制提交是才将数据一次性提交到HBase的服务器。这个缓冲区可以通过调用 HTable.setAutoFlush(false) 来开启。而新版HBbase的API中使用了BufferedMutator替换了老版的缓冲区,通过BufferedMutator对象提交的数据自动存放在缓冲区中。
package com.liu.hbase;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.TableName;import org.apache.hadoop.hbase.client.BufferedMutator;import org.apache.hadoop.hbase.client.BufferedMutatorParams;import org.apache.hadoop.hbase.client.Connection;import org.apache.hadoop.hbase.client.ConnectionFactory;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.io.IOException;/*** @Author liu*/public class HbaseBufferedMutatorSink {private static Logger logger = LoggerFactory.getLogger(HbaseBufferedMutatorSink.class);private static BufferedMutator ht = null;public static BufferedMutator getInstance(String zk,String port, String parent, String tablename){if(null == ht){try {ht = intiHbase(zk,port,parent,tablename);} catch (IOException e) {logger.warn("hbase init failed {}",e);}}while(true){try{ht.getName();break;}catch (Exception e){try {Thread.sleep(1000 * 3);ht = intiHbase(zk,port,parent,tablename);} catch (Exception e1) {logger.warn("hbase init failed {}",e);}}}return ht;}public static BufferedMutator intiHbase(String zk,String port, String parent, String tablename) throws IOException {Configuration HBASE_CONFIG = new Configuration();HBASE_CONFIG.set("hbase.zookeeper.quorum",zk);HBASE_CONFIG.set("hbase.zookeeper.property.clientPort",port);HBASE_CONFIG.set("zookeeper.znode.parent",parent);HBASE_CONFIG.setLong("zookeeper.session.timeout", 900000);HBASE_CONFIG.setInt("hbase.client.retries.number", 5);HBASE_CONFIG.setInt("hbase.meta.scanner.caching", 5000);HBASE_CONFIG.setInt("hbase.client.prefetch.limit", 100);HBASE_CONFIG.setInt("hbase.rpc.timeout", 600000);HBASE_CONFIG.set("hbase.client.keyvalue.maxsize","524288000");//设置key和value的最大值为500mConfiguration configuration = HBaseConfiguration.create(HBASE_CONFIG);Connection conn = ConnectionFactory.createConnection(configuration);BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf(tablename));params.writeBufferSize(4 * 1024 * 1024); // 设置缓存BufferedMutator mutator = conn.getBufferedMutator(params);return mutator;}}
数据写入方法
package com.liu.hbase;import org.apache.hadoop.hbase.client.BufferedMutator;import org.apache.hadoop.hbase.client.Durability;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.util.Bytes;import java.io.*;import java.util.ArrayList;import java.util.List;import java.util.zip.GZIPInputStream;/*** @Author liu*/public class HbasePutData {private final byte[] FAMILYNAME = Bytes.toBytes("f");private final byte[] QUALIFIER = Bytes.toBytes("q");private void readfile(String file_name, String zk, String port, String parent, String table) throws IOException {File file = new File(file_name);FileInputStream fis = new FileInputStream(file);GZIPInputStream gzis = new GZIPInputStream(fis, 8 * 1024);InputStreamReader isr = new InputStreamReader(gzis);BufferedReader br = new BufferedReader(isr);BufferedMutator ht = HbaseBufferedMutatorSink.getInstance(zk, port, parent, table);List<Put> puts = new ArrayList<Put>();String line = null;while ((line = br.readLine()) != null) {String[] part = line.split(",", -1);String udid = part[0];String permanent_id = part[1];byte[] key_hash = Bytes.toBytes((short) (permanent_id.hashCode() & 0x7fff));byte[] key = Bytes.add(key_hash, Bytes.toBytes(permanent_id));Put put = new Put(key);put.addColumn(FAMILYNAME, QUALIFIER, Bytes.toBytes(udid));put.setDurability(Durability.SKIP_WAL);//关闭日志puts.add(put);if (puts.size() >= 2000) {ht.mutate(puts);ht.flush();puts.clear();}}if(!puts.isEmpty()){ht.mutate(puts);ht.flush();puts.clear();}if(null != ht) ht.close();if (null !=br) br.close();if (null !=isr) isr.close();if (null !=fis) fis.close();if (null !=gzis) gzis.close();}}
hbase建表(预分区)
package com.liu.hbase;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.*;import org.apache.hadoop.hbase.client.Admin;import org.apache.hadoop.hbase.client.Connection;import org.apache.hadoop.hbase.client.ConnectionFactory;import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;import org.apache.hadoop.hbase.regionserver.BloomType;import org.apache.hadoop.hbase.util.Bytes;import java.io.IOException;/*** @Author liu*/public class ceateTable_by {private static Configuration CONF = HBaseConfiguration.create();private static final byte[] FAMILYNAME = Bytes.toBytes("f");private static final HColumnDescriptor hd = new HColumnDescriptor(FAMILYNAME);private static Admin admin = null;private static HTableDescriptor desc = null;public static void main(String[] args){String ZooKeeper ="ip1,1p2,1p3";String Parent = "/hbase";String RegionNum = "10";String DateStr = "";String TableName= "hbase_table";hd.setMaxVersions(1);hd.setBlocksize(256 * 1024);// hd.setTimeToLive(Integer.MAX_VALUE);hd.setTimeToLive(2 * 24 * 60 * 60); // 两天生存周期hd.setBloomFilterType(BloomType.NONE);hd.setCompressionType(Algorithm.LZ4);long start = System.currentTimeMillis();CONF.set("hbase.zookeeper.quorum", ZooKeeper);// hd.setCompressionType(Algorithm.NONE);for (String zkparent : Parent.split(",", -1)) {CONF.set("zookeeper.znode.parent", zkparent);try {Connection conn = ConnectionFactory.createConnection(CONF);admin = conn.getAdmin();} catch (MasterNotRunningException e) {e.printStackTrace();} catch (ZooKeeperConnectionException e) {e.printStackTrace();} catch (IOException e) {e.printStackTrace();}//create tablecreateRegions(Integer.parseInt(RegionNum),DateStr,TableName);try {admin.close();} catch (IOException e) {e.printStackTrace();}}System.out.println("Total time consumed : "+ (System.currentTimeMillis() - start) / 1000F + "s");}private static void createRegions(int regionNum, String date, String name){desc = new HTableDescriptor(TableName.valueOf(name + date));desc.addFamily(hd);try {//均分regionadmin.createTable(desc, getSplitKey(regionNum));} catch (IOException e) {e.printStackTrace();}System.out.println("Create table " + desc.getTableName() + " on "+ CONF.get("zookeeper.znode.parent"));}// 用来均分regionprivate static byte[][] getSplitKey(int rnum) {short d;short j = 0;short splitKey = 0;byte[][] barr = new byte[rnum - 1][];d = (short)(0x7FFF / rnum);short remain =(short)( 0x7FFF - rnum * d);int startadd = (rnum - remain) / 2;for (short i = 0; i < rnum - 1; i++) {if (i >= startadd && j++ <= remain) {splitKey += d;splitKey++;barr[i] = Bytes.toBytes((short)splitKey);} else {splitKey += d;barr[i] = Bytes.toBytes((short)splitKey);}}return barr;}}
上面预分10个分区
就介绍到这里吧,有兴趣的同学可以使用下,经过关闭自动提交、设置缓存和关闭WAL日志后,写入的效率绝对大大的提升,数据量越大越明显!
其实这个已经是老戏长谈了,只不过现在升级了版本以后好多同学不会用了,找不到对应的方法而已。
