HBase2.x源码剖析:HMaster启动过程
一、概述
本文基于HBase-2.2.1分析HMaster的启动流程。由于HMaster启动代码比较多,这里只是将主要函数拿出来说一下,其实主要做了以下几件事:
1).获取配置文件,对HBasemaster进行了实例化,由于HMaster继承自HRregionServer,先调用HRegionServer的构造函数进行初始化;
2).HRegionServer构造函数主要是对做了一些校验,如果启用了Kerberos先进行进行了登录,启动zookeper对于hbase znode的监听,获取master对应znode节点变化,同时启动对于集群状态变化的监听;
3).HMasterr构造函数启动balaner,实例化MetricsMaster统计Master数据供webUI接口调用;
4).最后调用run函数,循环向zk中注册成为active Master,一直阻塞循环直到写入成功为止,成功后进入finishInitialization()函数并初始化master。
二、源码剖析
1.一般我们单独启动HMaster的启动命令是:
/data/app/hbase-2.2.1/bin/hbase-daemon.sh start master
2.hbase-daemon.sh脚本做了以下工作:
1).设置master进程文件目录,默认是/tmp,进程文件名为:hbase-${USER}-master.pid
2).master日志路径,默认为$HBASE_HOME/logs
3).获取JAVA_HOME,所以环境变量或者hbase-env.sh一定配置JAVA_HOME
4).最后调用以下函数进行启动
(start)check_before_starthbase_rotate_log $HBASE_LOGOUThbase_rotate_log $HBASE_LOGGCecho running $command, logging to $HBASE_LOGOUT$thiscmd --config "${HBASE_CONF_DIR}" \foreground_start $command $args < /dev/null > ${HBASE_LOGOUT} 2>&1 &disown -h -rsleep 1; head "${HBASE_LOGOUT}";;
3.入口函数是是HMaster类的main函数,代码如下:
/*** @see org.apache.hadoop.hbase.master.HMasterCommandLine*/public static void main(String [] args) {LOG.info("STARTING service " + HMaster.class.getSimpleName());//这里没啥东西 就是打印版本信息VersionInfo.logVersion();//实例化一个HMasterCommandLine对象,执行该对象的doMain(args)方法new HMasterCommandLine(HMaster.class).doMain(args);}
4.下面看HMasterCommandLine的doMain(),这个函数在其父类ServerCommandLine中,代码如下:
public void doMain(String args[]) {try {//通过ToolRunner机制执行启动/停止等命令int ret = ToolRunner.run(HBaseConfiguration.create(), this, args);if (ret != 0) {System.exit(ret);}} catch (Exception e) {LOG.error("Failed to run", e);System.exit(-1);}}
5.调用ToolRunner.run(),将conf,args封装到了GenericOptionsParser,然后返回到HMasterCommandLine的run()方法中去执行,封装代码:
public static int run(Configuration conf, Tool tool, String[] args)throws Exception{if(conf == null) {conf = new Configuration();}//封装成了GenericOptionsParser 对象,然后调用HMasterCommandLine的run函数启动masterGenericOptionsParser parser = new GenericOptionsParser(conf, args);//set the configuration back, so that Tool can configure itselftool.setConf(conf);//get the args w/o generic hadoop argsString[] toolArgs = parser.getRemainingArgs();return tool.run(toolArgs);}
6.后面直接看HMasterCommandLine的run()函数,代码如下:
public int run(String args[]) throws Exception {Options opt = new Options();opt.addOption("localRegionServers", true,"RegionServers to start in master process when running standalone");opt.addOption("masters", true, "Masters to start in this process");opt.addOption("minRegionServers", true, "Minimum RegionServers needed to host user tables");opt.addOption("backup", false, "Do not try to become HMaster until the primary fails");CommandLine cmd; //获取cmd 其实就一个start 参数try {cmd = new GnuParser().parse(opt, args);} catch (ParseException e) {LOG.error("Could not parse: ", e);usage(null);return 1;}//下面就是一些判断了,直接跳过 下面代码省略掉一部分判断,各种判断if (cmd.hasOption("minRegionServers")) {.....................................List<String> remainingArgs = cmd.getArgList();if (remainingArgs.size() != 1) {usage(null);return 1;}String command = remainingArgs.get(0);if ("start".equals(command)) {// 跳了一圈最后执行的是这个startMaster()函数return startMaster();} else if ("stop".equals(command)) {return stopMaster();} else if ("clear".equals(command)) {return (ZNodeClearer.clear(getConf()) ? 0 : 1);} else {usage("Invalid command: " + command);return 1;}}
7.下面这个startMaster()函数,才是真正的HMaster启动,代码如下:
private int startMaster() {//初始化conf配置Configuration conf = getConf();TraceUtil.initTracer(conf);try {// If 'local', defer to LocalHBaseCluster instance. Starts master// and regionserver both in the one JVM.//如果是本地默认,这master和regionserver在一个节点上启动,我们是集群模式 直接跳过if (LocalHBaseCluster.isLocal(conf)) {.............................} else {//直接看这里,打印配置信息到日志里面logProcessInfo(getConf());//这里构造一个HMaster线程,由于Hmaster继承自HRegionser类,这里会构造HRegionser,执行初始化HMaster master = HMaster.constructMaster(masterClass, conf);if (master.isStopped()) {LOG.info("Won't bring the Master up as a shutdown is requested");return 1;}//调用start 和join方法master.start();master.join();if(master.isAborted())throw new RuntimeException("HMaster Aborted");}} catch (Throwable t) {LOG.error("Master exiting", t);return 1;}return 0;}
8.由于Hmaster继承自HRegionser类,这里会构造HRegionser,执行初始化,看下HRegionser的构造函数:
public HRegionServer(Configuration conf) throws IOException {super("RegionServer"); // thread nameTraceUtil.initTracer(conf);try {this.startcode = System.currentTimeMillis();this.conf = conf;this.fsOk = true;//下面都是一些校验this.masterless = conf.getBoolean(MASTERLESS_CONFIG_NAME, false);this.eventLoopGroupConfig = setupNetty(this.conf);//检查是否有足够的内存分配给Memstore和Block Cache使用//memstore 默认分配40%的内存给Memstore 由参数hbase.regionserver.global.memstore.size控制//block cache也是默认分配40% 由参数hfile.block.cache.size控制//就是说memstore和Block cache 内存不能大于80% 否则报错MemorySizeUtil.checkForClusterFreeHeapMemoryLimit(this.conf);//文件格式通过hfile.format.version配置。老版本是2 现在是3 只能是2 和3HFile.checkHFileVersion(this.conf);checkCodecs(this.conf);this.userProvider = UserProvider.instantiate(conf);//checksum校验 一般设置为flase不校验FSUtils.setupShortCircuitRead(this.conf);// Disable usage of meta replicas in the regionserverthis.conf.setBoolean(HConstants.USE_META_REPLICAS, false);// Config'ed params//重试次数this.numRetries = this.conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);//regionserver合并周期this.compactionCheckFrequency = conf.getInt(PERIOD_COMPACTION, this.threadWakeFrequency);this.flushCheckFrequency = conf.getInt(PERIOD_FLUSH, this.threadWakeFrequency);this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 1000);this.sleeper = new Sleeper(this.msgInterval, this);boolean isNoncesEnabled = conf.getBoolean(HConstants.HBASE_RS_NONCES_ENABLED, true);this.nonceManager = isNoncesEnabled ? new ServerNonceManager(this.conf) : null;this.numRegionsToReport = conf.getInt("hbase.regionserver.numregionstoreport", 10);this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);this.shortOperationTimeout = conf.getInt(HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY,HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT);this.abortRequested = false;this.stopped = false;//这里比较重要 调用createRpcService生成RSRpcServices对象//在构造RpcServer对象的过程中,HMaster和HRegionServer分别创建rpcserver服务// 以使HMaster和HRegionServer响应不同的rpc服务rpcServices = createRpcServices();useThisHostnameInstead = getUseThisHostnameInstead(conf);String hostName =StringUtils.isBlank(useThisHostnameInstead) ? this.rpcServices.isa.getHostName(): this.useThisHostnameInstead;//根据主机名 端口和 启动时间确定服务名//格式大约是这样的master.hadoop.ljs:16000:时间戳serverName = ServerName.valueOf(hostName, this.rpcServices.isa.getPort(), this.startcode);rpcControllerFactory = RpcControllerFactory.instantiate(this.conf);rpcRetryingCallerFactory = RpcRetryingCallerFactory.instantiate(this.conf);// login the zookeeper client principal (if using security)//这里只有在开启了kerberos的安全集群才会进行zookeeper的登录//非安全集群这里忽略ZKUtil.loginClient(this.conf, HConstants.ZK_CLIENT_KEYTAB_FILE,HConstants.ZK_CLIENT_KERBEROS_PRINCIPAL, hostName);// login the server principal (if using secure Hadoop)//这里也一样 非安全集群 可忽略login(userProvider, hostName);// init superusers and add the server principal (if using security)// or process owner as default super user.Superusers.initialize(conf);//实例化RegionServerAccounting 它用来记录此rs中所有的memstore所占大小的实例regionServerAccounting = new RegionServerAccounting(conf);//HBase2.0中Master节点可以有表 默认为falseboolean isMasterNotCarryTable =this instanceof HMaster && !LoadBalancer.isTablesOnMaster(conf);// isMasterNotCarryTable为false 需要创建block cache缓存和 MOB缓存if (!isMasterNotCarryTable) {blockCache = BlockCacheFactory.createBlockCache(conf);mobFileCache = new MobFileCache(conf);}uncaughtExceptionHandler = new UncaughtExceptionHandler() {public void uncaughtException(Thread t, Throwable e) {abort("Uncaught exception in executorService thread " + t.getName(), e);}};//获取HBase在hdfs上的各个存储目录 比如WAL预写日志 数据存储路径等initializeFileSystem();//hbase-site.xml中读取span RegionServer参数指标spanReceiverHost = SpanReceiverHost.getInstance(getConfiguration());this.configurationManager = new ConfigurationManager();setupWindows(getConfiguration(), getConfigurationManager());// Some unit tests don't need a cluster, so no zookeeper at allif (!conf.getBoolean("hbase.testing.nocluster", false)) {//这里获取zookeeper连接,并启动hbase znode节点的监听zooKeeper = new ZKWatcher(conf, getProcessName() + ":" +rpcServices.isa.getPort(), this, canCreateBaseZNode());// If no master in cluster, skip trying to track one or look for a cluster status.if (!this.masterless) {if (conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK,DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)) {this.csm = new ZkCoordinatedStateManager(this);}//根据zookeeper相关信息,设置MasterAddressTracker 构造一个Master地址的监听器masterAddressTracker = new MasterAddressTracker(getZooKeeper(), this);//启动对于Hbase znode对应的监听,masterAddressTracker.start();//创建一个对集群状态的监听clusterStatusTracker = new ClusterStatusTracker(zooKeeper, this);//启动监听clusterStatusTracker.start();} else {masterAddressTracker = null;clusterStatusTracker = null;}} else {zooKeeper = null;masterAddressTracker = null;clusterStatusTracker = null;}//启动rpc 等待regionserver端和客户端的请求this.rpcServices.start(zooKeeper);// This violates 'no starting stuff in Constructor' but Master depends on the below chore// and executor being created and takes a different startup route. Lots of overlap between HRS// and M (An M IS A HRS now). Need to refactor so less duplication between M and its super// Master expects Constructor to put up web servers. Ugh.// class HRS. TODO.this.choreService = new ChoreService(getName(), true);this.executorService = new ExecutorService(getName());putUpWebUI();} catch (Throwable t) {// Make sure we log the exception. HRegionServer is often started via reflection and the// cause of failed startup is lost.LOG.error("Failed construction RegionServer", t);throw t;}}
9.HRegionserver父类构造函数执行完成之后,调用HMaster的构造函数,这里看下代码:
public HMaster(final Configuration conf)throws IOException, KeeperException {super(conf);TraceUtil.initTracer(conf);try {if (conf.getBoolean(MAINTENANCE_MODE, false)) {LOG.info("Detected {}=true via configuration.", MAINTENANCE_MODE);maintenanceMode = true;} else if (Boolean.getBoolean(MAINTENANCE_MODE)) {LOG.info("Detected {}=true via environment variables.", MAINTENANCE_MODE);maintenanceMode = true;} else {maintenanceMode = false;}//存储regionserver异常的内存缓存this.rsFatals = new MemoryBoundedLogMessageBuffer(conf.getLong("hbase.master.buffer.for.rs.fatals", 1 * 1024 * 1024));LOG.info("hbase.rootdir=" + getRootDir() +", hbase.cluster.distributed=" + this.conf.getBoolean(HConstants.CLUSTER_DISTRIBUTED, false));// Disable usage of meta replicas in the masterthis.conf.setBoolean(HConstants.USE_META_REPLICAS, false);//这里修改conf配置,然后将replication相关特性写入conf中decorateMasterConfiguration(this.conf);// Hack! Maps DFSClient => Master for logs. HDFS made this// config param for task trackers, but we can piggyback off of it.if (this.conf.get("mapreduce.task.attempt.id") == null) {this.conf.set("mapreduce.task.attempt.id", "hb_m_" + this.serverName.toString());}//实例化hbase的监控this.metricsMaster = new MetricsMaster(new MetricsMasterWrapperImpl(this));// preload table descriptor at startupthis.preLoadTableDescriptors = conf.getBoolean("hbase.master.preload.tabledescriptors", true);this.maxBlancingTime = getMaxBalancingTime();this.maxRitPercent = conf.getDouble(HConstants.HBASE_MASTER_BALANCER_MAX_RIT_PERCENT,HConstants.DEFAULT_HBASE_MASTER_BALANCER_MAX_RIT_PERCENT);// 集群状态发表。 比如当regionService 死了,要立即告知client ,不要用client等待socket回应了。boolean shouldPublish = conf.getBoolean(HConstants.STATUS_PUBLISHED,HConstants.STATUS_PUBLISHED_DEFAULT);Class<? extends ClusterStatusPublisher.Publisher> publisherClass =conf.getClass(ClusterStatusPublisher.STATUS_PUBLISHER_CLASS,ClusterStatusPublisher.DEFAULT_STATUS_PUBLISHER_CLASS,ClusterStatusPublisher.Publisher.class);if (shouldPublish) {if (publisherClass == null) {LOG.warn(HConstants.STATUS_PUBLISHED + " is true, but " +ClusterStatusPublisher.DEFAULT_STATUS_PUBLISHER_CLASS +" is not set - not publishing status");} else {clusterStatusPublisherChore = new ClusterStatusPublisher(this, conf, publisherClass);getChoreService().scheduleChore(clusterStatusPublisherChore);}}// Some unit tests don't need a cluster, so no zookeeper at allif (!conf.getBoolean("hbase.testing.nocluster", false)) {//视图将Master信息写入到zookeeper中,这里构造函数会启动一个zookeeper 对Mater的znode的监听//接收zookeeer的事件this.activeMasterManager = new ActiveMasterManager(zooKeeper, this.serverName, this);} else {this.activeMasterManager = null;}} catch (Throwable t) {// Make sure we log the exception. HMaster is often started via reflection and the// cause of failed startup is lost.LOG.error("Failed construction of Master", t);throw t;}}
10.最后调用HMaster中的run()函数,这里会启动一个循环,在成为活动主服务器后,通过调用regionserver运行循环;在成为主节点之前一直阻塞。
public void run() {try {if (!conf.getBoolean("hbase.testing.nocluster", false)) {Threads.setDaemonThreadRunning(new Thread(() -> {try {int infoPort = putUpJettyServer();startActiveMasterManager(infoPort);} catch (Throwable t) {// Make sure we log the exception.String error = "Failed to become Active Master";LOG.error(error, t);// Abort should have been called already.if (!isAborted()) {abort(error, t);}}}), getName() + ":becomeActiveMaster");}// Fall in here even if we have been aborted. Need to run the shutdown services and// the super run call will do this for us.super.run();} finally {if (this.clusterSchemaService != null) {// If on way out, then we are no longer active master.this.clusterSchemaService.stopAsync();try {this.clusterSchemaService.awaitTerminated(getConfiguration().getInt(HBASE_MASTER_WAIT_ON_SERVICE_IN_SECONDS,DEFAULT_HBASE_MASTER_WAIT_ON_SERVICE_IN_SECONDS), TimeUnit.SECONDS);} catch (TimeoutException te) {LOG.warn("Failed shutdown of clusterSchemaService", te);}}this.activeMaster = false;}}
至此,HMaster启动的大体流程剖析完毕,这里只是分析了主流程,很多细节需要你自己再去看看源码。
