如何快速解析Spark SQL
背景
在观远数据, 我们深度基于Apache Spark打造新一代数据产品. 一项非常常见的任务就是如何快速分析 Spark SQL.
如何正确且快速的分析用户编写的SQL呢. (在观远BI中, 主要是用户熟悉的自定义字段)
网上流行的教程是如何用ANTLR实现SQL解析. 但是这个不光是很难支持所有的Spark语法, 另一方面维护等都是问题. (最好的维护是不用我们自己维护)
那我们直接用Spark来解析它自己的SQL吧.
使用SparkSession来解析.
比如: 我们要解析如下的SQL片段:
(count(distinct col1) + 12345) as c1
由于我们分析这些SQL片段的环境, 可能并不是真实有这些实际数据的环境, 那为了能更完整的分析, 我们可以构造一个完整SQL
select (count(distinct col1) + 12345) as c1
from (
select cast(null as string) as col1
)
这需要知道之前的数据集上都有哪些列, 以及其类型(这一般都不是问题)
那, 接下来, 让我们用标准的流程来解析一下这个SQL吧.
public static void main(String[] args) throws Exception {
String sqlString = "select (count(distinct col1) + 12345) as c1 from (select cast(null as string) as col1) ";
SparkSession sparkSession = SparkSession.builder()
.master("local[1]")
.getOrCreate();
LogicalPlan logicalPlan = sparkSession.sessionState().sqlParser().parsePlan(sqlString);;
LogicalPlan analyzedPlan = sparkSession.sessionState().analyzer().executeAndCheck(logicalPlan, new QueryPlanningTracker());
System.out.println(analyzedPlan.analyzed());
System.out.println(analyzedPlan.toJSON());
sparkSession.close();
}
我们知道SQL解析一般是有几个流程:
为了分析, 我们需要得到图中的 "Logical Plan" (更准确说是: Resolved Logical Plan, 这个阶段会检查列是否存在, 以及function是否注册等)
运行上面的程序后, 我们可以得到如下输出:
我们可以得到如下的语法树 JSON.
虽然我们可以得到正确的结果, 但是我们仍然需要启动一个完整的SparkSession, 这是代价很大的. 我们是否有更轻量的方法.
更轻量的方法
幸好, Spark有各种各样的Unit Test, 这些测试中, 肯定有不用完整运行Spark就能跑的. 比如, 对于我们这个场景(主要是 Analyzer 相关), 有个类似的实现.
src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala 文件中的
/**
* A trivial [[Analyzer]] with a dummy [[SessionCatalog]], [[EmptyFunctionRegistry]] and
* [[EmptyTableFunctionRegistry]]. Used for testing when all relations are already filled
* in and the analyzer needs only to resolve attribute references.
*/
object SimpleAnalyzer extends Analyzer(
new CatalogManager(
FakeV2SessionCatalog,
new SessionCatalog(
new InMemoryCatalog,
EmptyFunctionRegistry,
EmptyTableFunctionRegistry) {
override def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit = {}
})) {
override def resolver: Resolver = caseSensitiveResolution
}
这里的SimpleAnalyzer似乎就是我们需要的, 除了其使用了EmptyFunctionRegistry, 而我们需要所有的Spark自带函数.
那我们新建一个类似的类. (由于SimpleAnalyzer使用了很多只能在 org.apache.spark.sql package下访问的类), 那我们需要在我们的项目中也建个 org.apache.spark.sql 包下的类:
package org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, InMemoryCatalog, SessionCatalog}
import org.apache.spark.sql.connector.catalog.CatalogManager
object GuandataSimpleAnalyzer extends Analyzer(
new CatalogManager(
FakeV2SessionCatalog,
new SessionCatalog(
new InMemoryCatalog,
FunctionRegistry.builtin,
TableFunctionRegistry.builtin) {
override def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit = {}
})) {
override def resolver: Resolver = caseSensitiveResolution
}
让我们来使用这个简单Analyzer实现一下 SQL Parser吧:
public static void main(String[] args) {
String sqlString = "select (count(distinct col1) + 12345) as c1 from (select cast(null as string) as col1) ";
SparkSqlParser parser = new SparkSqlParser();
LogicalPlan logicalPlan = parser.parsePlan(sqlString);
LogicalPlan analyzedPlan = GuandataSimpleAnalyzer.executeAndCheck(logicalPlan, new QueryPlanningTracker());
System.out.println(analyzedPlan.analyzed());
System.out.println(analyzedPlan.toJSON());
}
运行程序, 屏幕上会有如下的输出:
我们可以在获得同样的结果前提下, 发现看到的日志非常少了, 速度也快了很多. 有了轻量的Spark SQL Parser, 我们可以围绕它做很多很cool的功能!