vlambda博客
学习文章列表

Hive 到底怎么把sql转成MapReduce的?

hive源码看到最后了,终于来到最关键的地方了。


还是举个例子: 上次那个sql语句:


select name,count(salary) from hive_db.employee where name = 'Krian' group by name limit 1;


最后经过物理计划转化成的Map/Reduce Task为:



然后ExecDriverexecute 方法提交job。

其中:

job.setMapperClass(ExecMapper.class);job.setReducerClass(ExecReducer.class);


所以Map和reduce的类分别在ExecMapper.classExecReducer.class里。


先看ExecMapper.class, reducer和它差不多。


这个类的map方法主要分2步,


1. 初始化变量 如 OutputCollector map的输出,

最重要的是把OutputCollector 传给子节点的ReduceSinkOperator,因为RS要 OutputCollector 输出结果,


2. 这个类里维护了一个 AbstractMapOperator 变量,第二步就是运行MapOperator 的 process方法。 这个方法是核心。


首先,它的作用就是从头节点遍历Map Task的 Operator树,并运行子Operator节点的 process方法。


因为所以的Operator节点(如TableScanOperator)都继承Operator类。


它的核心方法有2个,  process和 forward方法。


process方法就是这个Operator在Mapreduce的真正操作逻辑。


最后执行forward方法。


forward方法就是执行所有子Operator节点的 process方法。


所以比如上面图中Map Task,有5个 Operator节点。


它的流程就是先执行


MapOperator 的 process -> 里面执行 TS 的process -> TS 的forward 

->FIL的的process -> FIL的forward ->SEL的的process -> SEL的forward 

->GBY的的process -> GBY的forward -> RS的的process .


所以所有的map操作逻辑都在各个OP节点的process 方法。 执行时只需要前序遍历OP树就可以了。


最后附个代码:TableScanOperator的process方法:


 public void process(Object row, int tag) throws HiveException { if (rowLimit >= 0) { if (row instanceof VectorizedRowBatch) { VectorizedRowBatch batch = (VectorizedRowBatch) row; if (currCount >= rowLimit) { setDone(true); return; } if (currCount + batch.size > rowLimit) { batch.size = rowLimit - currCount; } currCount += batch.size; } else if (currCount++ >= rowLimit) { setDone(true); return; } } if (conf != null && conf.isGatherStats()) { gatherStats(row); } forward(row, inputObjInspectors[tag]); }


Reduce端和Map端差不多,就不细说了。



2
总结!



这会hive源码终于看完了,接下来卓仔要学习后端的东西了。