vlambda博客
学习文章列表

老司机带你看Hive客户端源码分析

     Hive适合批处理,跑大型重型任务还可以的,主要这玩意比较稳定。但是逐步一线大厂已经开始用Spark全部替换Hive了,单存会Hive出去找工作越来越难了。当然目前离线Hive还是有市场的,目前中小型使用还是比较多的。

    Hive装逼系列之源码分析,有时候看源码排查问题一目了然,尤其对于集群运维小伙伴来说。老司机可以研究下,面试装逼必备。


源码版本:Hive.2.3.7,电脑看效果更好哈。

1.分析一下:CliDriver类


main方法:

public static void main(String[] argsthrows Exception {
int ret = new CliDriver().run(args);
System.exit(ret);
}

run方法:主要是各种参数的初始化

public int run(String[] argsthrows Exception {

//oproc 选项处理器,处理命令行参数
OptionsProcessor oproc = new OptionsProcessor();
if (!oproc.process_stage1(args)) {
// 初始化环境配置,返回true,意味着除打印日志级别外其他参数已经写入hiveVariables中
// hiveVariables 是一个Map<String,String>
// oproc.process_stage1(args)
return 1;
}

// 日志初始化
// NOTE: It is critical to do this here so that log4j is reinitialized
// before any of the other core hive classes are loaded
boolean logInitFailed = false;
String logInitDetailMessage;
//初始化HiveLog4j配置信息
try {
logInitDetailMessage = LogUtils.initHiveLog4j();
catch (LogInitializationException e) {
logInitFailed = true;
logInitDetailMessage = e.getMessage();
}

//ss客户端状态,创建CliSessionState对象,即客户端会话对象
CliSessionState ss = new CliSessionState(new HiveConf(SessionState.class));
ss.in = System.in;
try {
ss.out = new PrintStream(System.outtrue"UTF-8");
ss.info = new PrintStream(System.errtrue);
ss.err = new CachingPrintStream(System.errtrue);
catch (UnsupportedEncodingException e) {
return 3;
}


//获取命令行的配置
//检查命令行配置属否有误,-e -f 不能同时选定,并设置获取对应属性进行配置
if (!oproc.process_stage2(ss)) {
return 2;
}
//如果是不沉默的
//hive.session.silent", false
if (!ss.getIsSilent()) {
//如果初始化失败,那么打印错误日志
if (logInitFailed) {
System.err.println(logInitDetailMessage);
else {
SessionState.getConsole().printInfo(logInitDetailMessage);
}
}

//创建HiveConf
// set all properties specified via command line
HiveConf conf = ss.getConf();
for (Map.Entry<ObjectObject> item : ss.cmdProperties.entrySet()) {
conf.set((Stringitem.getKey(), (Stringitem.getValue());
//获取并覆盖相应配置
ss.getOverriddenConfigurations().put((Stringitem.getKey(), (Stringitem.getValue());
}

// read prompt configuration and substitute variables.
//获取默认配置参数并进行替换
//CLIPROMPT : "hive.cli.prompt", "hive"
prompt = conf.getVar(HiveConf.ConfVars.CLIPROMPT);
prompt = new VariableSubstitution(new HiveVariableSource() {
@Override
public Map<StringString> getHiveVariable() {
return SessionState.get().getHiveVariables();
}
}).substitute(confprompt);
//生成长度和prompt长度一样的空格
prompt2 = spacesForString(prompt);

//HIVE_CLI_TEZ_SESSION_ASYNC : hive.cli.tez.session.async 是否使用来tez客户端,默认是true
if (HiveConf.getBoolVar(confConfVars.HIVE_CLI_TEZ_SESSION_ASYNC)) {
// Start the session in a fire-and-forget manner. When the asynchronously initialized parts of
// the session are needed, the corresponding getters and other methods will wait as needed.
//使用异步的方式开始会话
SessionState.beginStart(ssconsole);
else {
SessionState.start(ss);
}

ss.updateThreadName();

// execute cli driver work
// 执行cli driver 任务
try {
return executeDriver(ssconfoproc);
finally {
ss.resetThreadName();
ss.close();
}
}


executeDriver方法:


/**
* Execute the cli work
* @param ss CliSessionState of the CLI driver
* @param conf HiveConf for the driver session
* @param oproc Operation processor of the CLI invocation
* @return status of the CLI command execution
* @throws Exception
*/
/**
* 其实这个方法只是按照特定的规则也就是以';'结尾,将输入处理成一行,这一行可能是一个命令也可能是多个命令
* 然后交给processLine方法
* */
private int executeDriver(CliSessionState ssHiveConf confOptionsProcessor oproc)
throws Exception {

CliDriver cli = new CliDriver();
cli.setHiveVariables(oproc.getHiveVariables());

// use the specified database if specified
//使用特定的数据库,CliSessionState对象里包含里很多属性,其中一个就是使用哪个数据库
//database 就是数据库的名称
cli.processSelectDatabase(ss);

// Execute -i init files (always in silent mode)
//初始化文件,始终保持沉默
//如果使用里初始化文件,则在这里进行处理,这里其实就是我们使用 -i 指定的文件
cli.processInitFiles(ss);
//使用-e 指定的SQL,也就是我们常说的-e模式,这样的话就不会进入命令行,处理结束之后直接返回
if (ss.execString != null) {
int cmdProcessStatus = cli.processLine(ss.execString);
return cmdProcessStatus;
}

// -f模式指定的SQL文件和-e对应
try {
if (ss.fileName != null) {
return cli.processFile(ss.fileName);
}
catch (FileNotFoundException e) {
System.err.println("Could not open input file for reading. (" + e.getMessage() + ")");
return 3;
}
//HIVE_EXECUTION_ENGINE : hive执行的引擎,可以有mr, tez, spark
if ("mr".equals(HiveConf.getVar(confConfVars.HIVE_EXECUTION_ENGINE))) {
//如果使用的是MR,那么控制台打印警告信息
console.printInfo(HiveConf.generateMrDeprecationWarning());
}

//正式进入命令行,完成里ConsoleReader对象的初始化
setupConsoleReader();

String line;
int ret = 0;
String prefix = "";
String curDB = getFormattedDb(confss);
String curPrompt = prompt + curDB;
//生成与curDB长度相同的空白字符串
String dbSpaces = spacesForString(curDB);
// while循环读取客户端的输入命令,换行的SQL也是在这里完成拼接的
while ((line = reader.readLine(curPrompt + "> ")) != null) {
if (!prefix.equals("")) {
prefix += '\n';
}
// -- 开头的代表注释
if (line.trim().startsWith("--")) {
continue;
}

if (line.trim().endsWith(";"&& !line.trim().endsWith("\\;")) {
//包含;则说明已经获取到里完整的SQL,可以执行了
line = prefix + line;
//执行SQL
ret = cli.processLine(linetrue);
prefix = "";
curDB = getFormattedDb(confss);
curPrompt = prompt + curDB;
dbSpaces = dbSpaces.length() == curDB.length() ? dbSpaces : spacesForString(curDB);
else {
prefix = prefix + line;
curPrompt = prompt2 + dbSpaces;
continue;
}
}

return ret;
}


processLine方法:

/**
* Processes a line of semicolon separated commands
* 处理以分号分隔的命令
* @param line
* The commands to process
* @param allowInterrupting
* When true the function will handle SIG_INT (Ctrl+C) by interrupting the processing and
* returning -1
* @return 0 if ok
*/
//line, allowInterrupting=true
public int processLine(String lineboolean allowInterrupting) {
SignalHandler oldSignal = null;
Signal interruptSignal = null;

if (allowInterrupting) {
// Remember all threads that were running at the time we started line processing.
// Hook up the custom Ctrl+C handler while processing this line
interruptSignal = new Signal("INT");
//这里是一个回调方法,如果出现中断则执行该方法,输出信息
oldSignal = Signal.handle(interruptSignalnew SignalHandler() {
//interruptRequested : 中断请求
private boolean interruptRequested;

@Override
public void handle(Signal signal) {
boolean initialRequest = !interruptRequested;
interruptRequested = true;

// Kill the VM on second ctrl+c
// 在第二次ctrl+C ,杀死VM
if (!initialRequest) {
console.printInfo("Exiting the JVM");
System.exit(127);
}

// Interrupt the CLI thread to stop the current statement and return to prompt
// 中断 CLI 线程以停止当前语句并返回
console.printInfo("Interrupting... Be patient, this might take some time.");
console.printInfo("Press Ctrl+C again to kill JVM");

// First, kill any running MR jobs
// kill 掉正在运行的MR任务
// hadoop作业执行
HadoopJobExecHelper.killRunningJobs();
// Tez作业执行
TezJobExecHelper.killRunningJobs();
// Hive作业中断
HiveInterruptUtils.interrupt();
}
});
}

//准备执行命令
try {
int lastRet = 0ret = 0;

// we can not use "split" function directly as ";" may be quoted
// 使用';'进行切割,获取命令列表,没有直接使用"split"函数,而是使用"substring"切割的
List<String> commands = splitSemiColon(line);

String command = "";
for (String oneCmd : commands) {

if (StringUtils.endsWith(oneCmd"\\")) {
//如果是以"\\"结尾的,那么使用StringUtils.chop()切除最后一个字符
command += StringUtils.chop(oneCmd+ ";";
continue;
else {
command += oneCmd;
}
//StringUtils.isBlank() : 判断字符串command是否是空字符串
if (StringUtils.isBlank(command)) {
continue;
}

//处理命令
//例如:show databases
//如果没有退出,则正常返回ret=1,否则返回ret=0
ret = processCmd(command);
command = "";
lastRet = ret;
//CLIIGNOREERRORS : hive.cli.errors.ignore 客户端错误是否忽略,默认false
boolean ignoreErrors = HiveConf.getBoolVar(confHiveConf.ConfVars.CLIIGNOREERRORS);
if (ret != 0 && !ignoreErrors) {
CommandProcessorFactory.clean((HiveConfconf);
return ret;
}
}
CommandProcessorFactory.clean((HiveConfconf);
return lastRet;
finally {
// Once we are done processing the line, restore the old handler
//一旦我们处理完该行,恢复旧的处理程序
if (oldSignal != null && interruptSignal != null) {
Signal.handle(interruptSignaloldSignal);
}
}
}


processCmd方法:


/**
* processCmd方法需要将processLine方法传递进来的一个个命令进行一次处理,也就是说我们的processCmd方法处理的就是一个个确定的命令,
* 但是需要注意hive是可以执行多种命令的,例如:shell、source、add,所以我们要进行判断,然后进行处理。
* */
public int processCmd(String cmd) {
CliSessionState ss = (CliSessionStateSessionState.get();
ss.setLastCommand(cmd);

ss.updateThreadName();

// Flush the print stream, so it doesn't include output from the last command
ss.err.flush();
String cmd_trimmed = cmd.trim();
//使用空格切分cmd_trimmed命令串
String[] tokens = tokenizeCmd(cmd_trimmed);
int ret = 0;

//如果命令是"quit"或"exit",则退出客户端进程
//因为退出命令quit/exit就是单独的一行,所以这里使用cmd_trimmed进行判断
if (cmd_trimmed.toLowerCase().equals("quit"|| cmd_trimmed.toLowerCase().equals("exit")) {

// if we have come this far - either the previous commands
// are all successful or this is command line. in either case
// this counts as a successful run
ss.close();
System.exit(0);
// source 命令和-f有些类似,不同的是它进入命令行之后执行一个SQL文件,这里使用了分割后的数组进行了判断
else if (tokens[0].equalsIgnoreCase("source")) {
String cmd_1 = getFirstCmd(cmd_trimmedtokens[0].length());
//创建一个变换替换的变量并且实现HiveVariableSource类
cmd_1 = new VariableSubstitution(new HiveVariableSource() {
@Override
public Map<StringString> getHiveVariable() {
return SessionState.get().getHiveVariables();
}
}).substitute(ss.getConf(), cmd_1);

File sourceFile = new File(cmd_1);
if (! sourceFile.isFile()){
console.printError("File: "+ cmd_1 + " is not a file.");
ret = 1;
else {
try {
// 使用processFile方法读取SQL文件,最终还是交给processLine方法进行处理
ret = processFile(cmd_1);
catch (IOException e) {
console.printError("Failed processing file "+ cmd_1 +" "+ e.getLocalizedMessage(),
stringifyException(e));
ret = 1;
}
}
//shell 命令,hive的cli是可以执行shell命令的,格式需要以!开头
else if (cmd_trimmed.startsWith("!")) {
//for shell commands, use unstripped command
String shell_cmd = cmd_trimmed.substring(1);
shell_cmd = new VariableSubstitution(new HiveVariableSource() {
@Override
public Map<StringString> getHiveVariable() {
return SessionState.get().getHiveVariables();
}
}).substitute(ss.getConf(), shell_cmd);

// shell_cmd = "/bin/bash -c \'" + shell_cmd + "\'";
try {
//创建ShellCmdExecutor,执行SQL
ShellCmdExecutor executor = new ShellCmdExecutor(shell_cmdss.outss.err);
ret = executor.execute();
if (ret != 0) {
console.printError("Command failed with exit code = " + ret);
}
catch (Exception e) {
console.printError("Exception raised from Shell command " + e.getLocalizedMessage(),
stringifyException(e));
ret = 1;
}
else { // local mode
//排除了上面几种特殊的命令,到这里其实就是要执行的就是hive的命令了,不一定是SQL,也可能是SET、ADD之类的命令
try {
//Let Driver strip comments using sql parser
//proc就是命令处理器
CommandProcessor proc = CommandProcessorFactory.get(tokens, (HiveConfconf);
ret = processLocalCmd(cmdprocss);
catch (SQLException e) {
console.printError("Failed processing command " + tokens[0+ " " + e.getLocalizedMessage(),
org.apache.hadoop.util.StringUtils.stringifyException(e));
ret = 1;
}
}

ss.resetThreadName();
return ret;
}


CommandProcessorFactory.get方法(部分):


public static CommandProcessor get(String[] cmdHiveConf conf)
throws SQLException {
CommandProcessor result = getForHiveCommand(cmdconf);
if (result != null) {
return result;
}
if (isBlank(cmd[0])) {
return null;
else {
//如果conf为null,则初始化一个新的Driver
if (conf == null) {
return new Driver();
}
Driver drv = mapDrivers.get(conf);
if (drv == null) {
//如果对应的conf没有生成driver,那么就新初始化一个对应conf的Driver
//并且将conf和driver的对应关系存入mapDrivers
drv = new Driver();
mapDrivers.put(confdrv);
else {
//重置查询状态
drv.resetQueryState();
}
//刷新
drv.init();
return drv;
}
}


getForHiveCommand方法


public static CommandProcessor getForHiveCommand(String[] cmdHiveConf conf)
throws SQLException {
return getForHiveCommandInternal(cmdconffalse);
}


getForHiveCommandInternal方法:


public static CommandProcessor getForHiveCommandInternal(String[] cmdHiveConf conf,
boolean testOnly)
throws SQLException {
HiveCommand hiveCommand = HiveCommand.find(cmdtestOnly);
if (hiveCommand == null || isBlank(cmd[0])) {
return null;
}
if (conf == null) {
conf = new HiveConf();
}
Set<String> availableCommands = new HashSet<String>();
for (String availableCommand : conf.getVar(HiveConf.ConfVars.HIVE_SECURITY_COMMAND_WHITELIST)
.split(",")) {
availableCommands.add(availableCommand.toLowerCase().trim());
}
if (!availableCommands.contains(cmd[0].trim().toLowerCase())) {
throw new SQLException("Insufficient privileges to execute " + cmd[0], "42000");
}
if (cmd.length > 1 && "reload".equalsIgnoreCase(cmd[0])
&& "function".equalsIgnoreCase(cmd[1])) {
// special handling for SQL "reload function"
return null;
}
switch (hiveCommand) {
case SET:
return new SetProcessor();
case RESET:
return new ResetProcessor();
case DFS:
SessionState ss = SessionState.get();
return new DfsProcessor(ss.getConf());
case ADD:
return new AddResourceProcessor();
case LIST:
return new ListResourceProcessor();
case DELETE:
return new DeleteResourceProcessor();
case COMPILE:
return new CompileProcessor();
case RELOAD:
return new ReloadProcessor();
case CRYPTO:
try {
return new CryptoProcessor(SessionState.get().getHdfsEncryptionShim(), conf);
catch (HiveException e) {
throw new SQLException("Fail to start the command processor due to the exception: "e);
}
default:
throw new AssertionError("Unknown HiveCommand " + hiveCommand);
}
}


processLocalCmd方法:


int processLocalCmd(String cmdCommandProcessor procCliSessionState ss) {
int tryCount = 0;
boolean needRetry;
int ret = 0;

do {
try {
needRetry = false;
if (proc != null) {
//处理SQL命令
//instanceof 严格来说是Java中的一个双目运算符,用来测试一个对象是否为一个类的实例,在这里判断proc是否是Driver的实例,如果是,返回true
if (proc instanceof Driver) {
Driver qp = (Driverproc;
PrintStream out = ss.out;
long start = System.currentTimeMillis();
//判断是否输出SQL
if (ss.getIsVerbose()) {
out.println(cmd);
}

qp.setTryCount(tryCount);
ret = qp.run(cmd).getResponseCode();
if (ret != 0) {
qp.close();
return ret;
}

// query has run capture the time
//查询运行的时间
long end = System.currentTimeMillis();
double timeTaken = (end - start/ 1000.0;

ArrayList<String> res = new ArrayList<String>();

printHeader(qpout);

// print the results
int counter = 0;
try {
if (out instanceof FetchConverter) {
((FetchConverter)out).fetchStarted();
}
//通过getResults()方法,res会获取到执行的结果
while (qp.getResults(res)) {
for (String r : res) {
out.println(r);
}
counter += res.size();
res.clear();
if (out.checkError()) {
break;
}
}
catch (IOException e) {
console.printError("Failed with exception " + e.getClass().getName() + ":"
+ e.getMessage(), "\n"
+ org.apache.hadoop.util.StringUtils.stringifyException(e));
ret = 1;
}

//关闭driver对象
int cret = qp.close();
if (ret == 0) {
ret = cret;
}

if (out instanceof FetchConverter) {
((FetchConverter)out).fetchFinished();
}

console.printInfo("Time taken: " + timeTaken + " seconds" +
(counter == 0 ? "" : ", Fetched: " + counter + " row(s)"));
else {
//处理非SQL的命令
String firstToken = tokenizeCmd(cmd.trim())[0];
String cmd_1 = getFirstCmd(cmd.trim(), firstToken.length());

if (ss.getIsVerbose()) {
ss.out.println(firstToken + " " + cmd_1);
}
CommandProcessorResponse res = proc.run(cmd_1);
if (res.getResponseCode() != 0) {
ss.out.println("Query returned non-zero code: " + res.getResponseCode() +
", cause: " + res.getErrorMessage());
}
if (res.getConsoleMessages() != null) {
for (String consoleMsg : res.getConsoleMessages()) {
console.printInfo(consoleMsg);
}
}
ret = res.getResponseCode();
}
}
catch (CommandNeedRetryException e) {
console.printInfo("Retry query with a different approach...");
tryCount++;
needRetry = true;
}
while (needRetry);

return ret;
}


CliSessionState对象:持有了很多属性信息


/**
* SessionState for hive cli.
*
*/
public class CliSessionState extends SessionState {
/**
* -database option if any that the session has been invoked with.
*/
public String database;

/**
* -e option if any that the session has been invoked with.
*/
public String execString;

/**
* -f option if any that the session has been invoked with.
*/
public String fileName;

/**
* properties set from -hiveconf via cmdline.
*/
public Properties cmdProperties = new Properties();

/**
* -i option if any that the session has been invoked with.
*/
public List<String> initFiles = new ArrayList<String>();

public CliSessionState(HiveConf conf) {
super(conf);
}

@Override
public void close() {
try {
super.close();
catch (IOException ioe) {
ioe.printStackTrace();
}
}
}


尖叫总结:

CliDriver是我们命令行执行SQL的入口,主要功能有: 1.从客户端获取输入,进行简单解析 2.根据解析的结果,创建不同的CommandProcessor,然后执行命令 3.最后返回执行的结果 4.执行SQL时的顺序:

main-->run--->executeDriver---->processLine--->processCmd--->processLocalCmd---->Driver类run方法

至此,CliDriver类的简单分析完成。