Hive 到底怎么把sql转成MapReduce的?
hive源码看到最后了,终于来到最关键的地方了。
还是举个例子: 上次那个sql语句:
select name,count(salary) from hive_db.employee where name = 'Krian' group by name limit 1;
最后经过物理计划转化成的Map/Reduce Task为:
然后ExecDriver的execute 方法提交job。
其中:
job.setMapperClass(ExecMapper.class);
job.setReducerClass(ExecReducer.class);
所以Map和reduce的类分别在ExecMapper.class和ExecReducer.class里。
先看ExecMapper.class, reducer和它差不多。
这个类的map方法主要分2步,
1. 初始化变量 如 OutputCollector map的输出,
最重要的是把OutputCollector 传给子节点的ReduceSinkOperator,因为RS要 OutputCollector 输出结果,
2. 这个类里维护了一个 AbstractMapOperator 变量,第二步就是运行MapOperator 的 process方法。 这个方法是核心。
首先,它的作用就是从头节点遍历Map Task的 Operator树,并运行子Operator节点的 process方法。
因为所以的Operator节点(如TableScanOperator)都继承Operator类。
它的核心方法有2个, process和 forward方法。
process方法就是这个Operator在Mapreduce的真正操作逻辑。
最后执行forward方法。
forward方法就是执行所有子Operator节点的 process方法。
所以比如上面图中Map Task,有5个 Operator节点。
它的流程就是先执行
MapOperator 的 process -> 里面执行 TS 的process -> TS 的forward
->FIL的的process -> FIL的forward ->SEL的的process -> SEL的forward
->GBY的的process -> GBY的forward -> RS的的process .
所以所有的map操作逻辑都在各个OP节点的process 方法。 执行时只需要前序遍历OP树就可以了。
最后附个代码:TableScanOperator的process方法:
public void process(Object row, int tag) throws HiveException {
if (rowLimit >= 0) {
if (row instanceof VectorizedRowBatch) {
VectorizedRowBatch batch = (VectorizedRowBatch) row;
if (currCount >= rowLimit) {
setDone(true);
return;
}
if (currCount + batch.size > rowLimit) {
batch.size = rowLimit - currCount;
}
currCount += batch.size;
} else if (currCount++ >= rowLimit) {
setDone(true);
return;
}
}
if (conf != null && conf.isGatherStats()) {
gatherStats(row);
}
forward(row, inputObjInspectors[tag]);
}
Reduce端和Map端差不多,就不细说了。
总结!
这会hive源码终于看完了,接下来卓仔要学习后端的东西了。