vlambda博客
学习文章列表

hbase批量写入优化

    

    现在hbase的使用已经很普遍了,但是我发现有的同学还是用的不到位,所以这次就主要来介绍下hbase批量写入的优化!


优化点

  • 建表需要预分区

  • 连接关闭自动提交

  • 连接添加缓存

  • 写入关闭WAL

因为hbase最近几年更新换代太频繁,本次主要针对2.0以上和2.0以下两个版本讲述

2.0以下的版本

maven

<dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>1.2.6</version></dependency>

hbaes连接

package cn.com.liu.hbase.conn;
import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.client.HTable;
import java.io.IOException;
/** * @Author liu */public class HbaseUtils { private static final Log logger = LogFactory.getLog(HbaseUtils.class); private static HTable ht = null;
public static HTable getInstance(String zk, String port, String parent, String table){ if(null == ht){ try { ht = Init(zk,port,parent,table); } catch (IOException e) { logger.warn("hbase init failed {}",e); } }
while(true){ try{ ht.getName(); break; }catch (Exception e){ try { Thread.sleep(1000 * 3); ht = Init(zk,port,parent,table); } catch (Exception e1) { logger.warn("hbase init failed {}",e); } } }
return ht; }

private static HTable Init(String zk, String port, String parent, String table) throws IOException { Configuration HBASE_CONFIG = new Configuration(); HBASE_CONFIG.set("hbase.zookeeper.quorum", zk); HBASE_CONFIG.set("hbase.zookeeper.property.clientPort", parent); HBASE_CONFIG.set("zookeeper.znode.parent", port); HBASE_CONFIG.setLong("zookeeper.session.timeout", 900000); HBASE_CONFIG.setInt("hbase.client.retries.number", 5); HBASE_CONFIG.setInt("hbase.meta.scanner.caching", 5000); HBASE_CONFIG.setInt("hbase.client.prefetch.limit", 100); HBASE_CONFIG.setInt("hbase.rpc.timeout", 600000); Configuration configuration = HBaseConfiguration.create(HBASE_CONFIG); HTable ht = new HTable(configuration, table); ht.setWriteBufferSize(8 * 1024 * 1024); // 设置缓存 ht.setAutoFlush(false, false); // 关闭自动提交 return ht; }}

数据插入代码

package cn.com.liu.hbase.put;
import cn.com.liu.hbase.conn.HbaseUtils;import org.apache.hadoop.hbase.client.Durability;import org.apache.hadoop.hbase.client.HTable;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.util.Bytes;
import java.io.*;import java.util.ArrayList;import java.util.List;import java.util.zip.GZIPInputStream;
/** * @Author liu
*/public class HbasePutData { private final byte[] FAMILYNAME = Bytes.toBytes("f"); private final byte[] QUALIFIER = Bytes.toBytes("q");
private void readfile(String file_name, String zk, String port, String parent, String table) throws IOException { File file = new File(file_name); FileInputStream fis = new FileInputStream(file); GZIPInputStream gzis = new GZIPInputStream(fis, 8 * 1024); InputStreamReader isr = new InputStreamReader(gzis); BufferedReader br = new BufferedReader(isr); HTable ht = HbaseUtils.getInstance(zk,port,parent,table);
List<Put> puts = new ArrayList<Put>(); String line = null; while ((line = br.readLine()) != null) { String[] part = line.split(",", -1); String udid = part[0]; String permanent_id = part[1];
byte[] key_hash = Bytes.toBytes((short) (permanent_id.hashCode() & 0x7fff)); byte[] key = Bytes.add(key_hash, Bytes.toBytes(permanent_id)); Put put = new Put(key); put.addColumn(FAMILYNAME, QUALIFIER, Bytes.toBytes(udid)); put.setWriteToWAL(false); // 关闭日志 puts.add(put);
if (puts.size() >= 2000) { ht.put(puts); ht.flushCommits(); puts.clear(); } }
if(!puts.isEmpty()){ ht.put(puts); ht.flushCommits(); puts.clear(); } if(null != ht) ht.close(); if (null !=br) br.close(); if (null !=isr) isr.close(); if (null !=fis) fis.close(); if (null !=gzis) gzis.close(); }}


2.0以上的版本

此处需要知道老版本中的 HTable.setAutoFlush(false) 已经取消,可以使用BufferedMutator替换

maven

<dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>2.2.4</version></dependency>

BufferedMutator创建

HBase的客户端API提供了写缓存区,put的数据一开始放在缓存区内,当数量到达指定的容量或者用户强制提交是才将数据一次性提交到HBase的服务器。这个缓冲区可以通过调用 HTable.setAutoFlush(false) 来开启。而新版HBbase的API中使用了BufferedMutator替换了老版的缓冲区,通过BufferedMutator对象提交的数据自动存放在缓冲区中。

package com.liu.hbase;
import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.TableName;import org.apache.hadoop.hbase.client.BufferedMutator;import org.apache.hadoop.hbase.client.BufferedMutatorParams;import org.apache.hadoop.hbase.client.Connection;import org.apache.hadoop.hbase.client.ConnectionFactory;import org.slf4j.Logger;import org.slf4j.LoggerFactory;
import java.io.IOException;
/** * @Author liu */public class HbaseBufferedMutatorSink { private static Logger logger = LoggerFactory.getLogger(HbaseBufferedMutatorSink.class); private static BufferedMutator ht = null;
public static BufferedMutator getInstance(String zk,String port, String parent, String tablename){ if(null == ht){ try { ht = intiHbase(zk,port,parent,tablename); } catch (IOException e) { logger.warn("hbase init failed {}",e); } }
while(true){ try{ ht.getName(); break; }catch (Exception e){ try { Thread.sleep(1000 * 3); ht = intiHbase(zk,port,parent,tablename); } catch (Exception e1) { logger.warn("hbase init failed {}",e); } } }
return ht; }
public static BufferedMutator intiHbase(String zk,String port, String parent, String tablename) throws IOException { Configuration HBASE_CONFIG = new Configuration(); HBASE_CONFIG.set("hbase.zookeeper.quorum",zk); HBASE_CONFIG.set("hbase.zookeeper.property.clientPort",port); HBASE_CONFIG.set("zookeeper.znode.parent",parent); HBASE_CONFIG.setLong("zookeeper.session.timeout", 900000); HBASE_CONFIG.setInt("hbase.client.retries.number", 5); HBASE_CONFIG.setInt("hbase.meta.scanner.caching", 5000); HBASE_CONFIG.setInt("hbase.client.prefetch.limit", 100); HBASE_CONFIG.setInt("hbase.rpc.timeout", 600000); HBASE_CONFIG.set("hbase.client.keyvalue.maxsize","524288000");//设置key和value的最大值为500m Configuration configuration = HBaseConfiguration.create(HBASE_CONFIG);
Connection conn = ConnectionFactory.createConnection(configuration); BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf(tablename)); params.writeBufferSize(4 * 1024 * 1024); // 设置缓存 BufferedMutator mutator = conn.getBufferedMutator(params);
return mutator; }}

数据写入方法

package com.liu.hbase;
import org.apache.hadoop.hbase.client.BufferedMutator;import org.apache.hadoop.hbase.client.Durability;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.util.Bytes;
import java.io.*;import java.util.ArrayList;import java.util.List;import java.util.zip.GZIPInputStream;
/** * @Author liu */public class HbasePutData { private final byte[] FAMILYNAME = Bytes.toBytes("f"); private final byte[] QUALIFIER = Bytes.toBytes("q");
private void readfile(String file_name, String zk, String port, String parent, String table) throws IOException { File file = new File(file_name); FileInputStream fis = new FileInputStream(file); GZIPInputStream gzis = new GZIPInputStream(fis, 8 * 1024); InputStreamReader isr = new InputStreamReader(gzis); BufferedReader br = new BufferedReader(isr); BufferedMutator ht = HbaseBufferedMutatorSink.getInstance(zk, port, parent, table);
List<Put> puts = new ArrayList<Put>(); String line = null; while ((line = br.readLine()) != null) { String[] part = line.split(",", -1); String udid = part[0]; String permanent_id = part[1];
byte[] key_hash = Bytes.toBytes((short) (permanent_id.hashCode() & 0x7fff)); byte[] key = Bytes.add(key_hash, Bytes.toBytes(permanent_id)); Put put = new Put(key); put.addColumn(FAMILYNAME, QUALIFIER, Bytes.toBytes(udid)); put.setDurability(Durability.SKIP_WAL);//关闭日志 puts.add(put);
if (puts.size() >= 2000) { ht.mutate(puts); ht.flush(); puts.clear(); } }
if(!puts.isEmpty()){ ht.mutate(puts); ht.flush(); puts.clear(); }
if(null != ht) ht.close(); if (null !=br) br.close(); if (null !=isr) isr.close(); if (null !=fis) fis.close(); if (null !=gzis) gzis.close(); }}

hbase建表(预分区)

package com.liu.hbase;
import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.*;import org.apache.hadoop.hbase.client.Admin;import org.apache.hadoop.hbase.client.Connection;import org.apache.hadoop.hbase.client.ConnectionFactory;import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;import org.apache.hadoop.hbase.regionserver.BloomType;import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
/** * @Author liu */
public class ceateTable_by { private static Configuration CONF = HBaseConfiguration.create(); private static final byte[] FAMILYNAME = Bytes.toBytes("f"); private static final HColumnDescriptor hd = new HColumnDescriptor(FAMILYNAME); private static Admin admin = null; private static HTableDescriptor desc = null;
public static void main(String[] args){ String ZooKeeper ="ip1,1p2,1p3"; String Parent = "/hbase"; String RegionNum = "10";    String DateStr = "";
    String TableName= "hbase_table";
hd.setMaxVersions(1);    hd.setBlocksize(256 * 1024);// hd.setTimeToLive(Integer.MAX_VALUE); hd.setTimeToLive(2 * 24 * 60 * 60); // 两天生存周期 hd.setBloomFilterType(BloomType.NONE); hd.setCompressionType(Algorithm.LZ4); long start = System.currentTimeMillis(); CONF.set("hbase.zookeeper.quorum", ZooKeeper);// hd.setCompressionType(Algorithm.NONE); for (String zkparent : Parent.split(",", -1)) { CONF.set("zookeeper.znode.parent", zkparent); try { Connection conn = ConnectionFactory.createConnection(CONF); admin = conn.getAdmin(); } catch (MasterNotRunningException e) { e.printStackTrace(); } catch (ZooKeeperConnectionException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); }      //create table createRegions(Integer.parseInt(RegionNum),DateStr,TableName); try { admin.close(); } catch (IOException e) { e.printStackTrace(); } } System.out.println("Total time consumed : "+ (System.currentTimeMillis() - start) / 1000F + "s"); }
private static void createRegions(int regionNum, String date, String name){ desc = new HTableDescriptor(TableName.valueOf(name + date)); desc.addFamily(hd); try { //均分region admin.createTable(desc, getSplitKey(regionNum)); } catch (IOException e) { e.printStackTrace(); } System.out.println("Create table " + desc.getTableName() + " on " + CONF.get("zookeeper.znode.parent")); } // 用来均分region private static byte[][] getSplitKey(int rnum) { short d; short j = 0; short splitKey = 0; byte[][] barr = new byte[rnum - 1][]; d = (short)(0x7FFF / rnum); short remain =(short)( 0x7FFF - rnum * d); int startadd = (rnum - remain) / 2; for (short i = 0; i < rnum - 1; i++) { if (i >= startadd && j++ <= remain) { splitKey += d; splitKey++; barr[i] = Bytes.toBytes((short)splitKey); } else { splitKey += d; barr[i] = Bytes.toBytes((short)splitKey); } } return barr; }}

上面预分10个分区


就介绍到这里吧,有兴趣的同学可以使用下,经过关闭自动提交、设置缓存和关闭WAL日志后,写入的效率绝对大大的提升,数据量越大越明显!

其实这个已经是老戏长谈了,只不过现在升级了版本以后好多同学不会用了,找不到对应的方法而已。