vlambda博客
学习文章列表

如何快速解析Spark SQL

背景

在观远数据, 我们深度基于Apache Spark打造新一代数据产品. 一项非常常见的任务就是如何快速分析 Spark SQL.

如何正确且快速的分析用户编写的SQL呢. (在观远BI中, 主要是用户熟悉的自定义字段)

网上流行的教程是如何用ANTLR实现SQL解析. 但是这个不光是很难支持所有的Spark语法, 另一方面维护等都是问题. (最好的维护是不用我们自己维护)

那我们直接用Spark来解析它自己的SQL吧.

使用SparkSession来解析.

比如: 我们要解析如下的SQL片段:

(count(distinct col1) + 12345) as c1 

由于我们分析这些SQL片段的环境, 可能并不是真实有这些实际数据的环境, 那为了能更完整的分析, 我们可以构造一个完整SQL

select (count(distinct col1) + 12345as c1
from (
   select cast(null as stringas col1
)

这需要知道之前的数据集上都有哪些列, 以及其类型(这一般都不是问题)

那, 接下来, 让我们用标准的流程来解析一下这个SQL吧.

public static void main(String[] args) throws Exception {
    String sqlString = "select (count(distinct col1) + 12345) as c1 from (select cast(null as string) as col1) ";

    SparkSession sparkSession = SparkSession.builder()
            .master("local[1]")
            .getOrCreate();

    LogicalPlan logicalPlan = sparkSession.sessionState().sqlParser().parsePlan(sqlString);;

    LogicalPlan analyzedPlan = sparkSession.sessionState().analyzer().executeAndCheck(logicalPlan, new QueryPlanningTracker());

    System.out.println(analyzedPlan.analyzed());
    System.out.println(analyzedPlan.toJSON());

    sparkSession.close();
}

我们知道SQL解析一般是有几个流程:


为了分析, 我们需要得到图中的 "Logical Plan" (更准确说是: Resolved Logical Plan, 这个阶段会检查列是否存在, 以及function是否注册等)

运行上面的程序后, 我们可以得到如下输出:

如何快速解析Spark SQL

如何快速解析Spark SQL

我们可以得到如下的语法树 JSON.

虽然我们可以得到正确的结果, 但是我们仍然需要启动一个完整的SparkSession, 这是代价很大的. 我们是否有更轻量的方法.

更轻量的方法

幸好, Spark有各种各样的Unit Test, 这些测试中, 肯定有不用完整运行Spark就能跑的. 比如, 对于我们这个场景(主要是 Analyzer 相关), 有个类似的实现.

src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala 文件中的

/**
 * A trivial [[Analyzer]] with a dummy [[SessionCatalog]], [[EmptyFunctionRegistry]] and
 * [[EmptyTableFunctionRegistry]]. Used for testing when all relations are already filled
 * in and the analyzer needs only to resolve attribute references.
 */

object SimpleAnalyzer extends Analyzer(
  new CatalogManager(
    FakeV2SessionCatalog,
    new SessionCatalog(
      new InMemoryCatalog,
      EmptyFunctionRegistry,
      EmptyTableFunctionRegistry
{
      override def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit = {}
    })) {
  override def resolverResolver = caseSensitiveResolution
}

这里的SimpleAnalyzer似乎就是我们需要的, 除了其使用了EmptyFunctionRegistry, 而我们需要所有的Spark自带函数.

那我们新建一个类似的类. (由于SimpleAnalyzer使用了很多只能在 org.apache.spark.sql package下访问的类), 那我们需要在我们的项目中也建个 org.apache.spark.sql 包下的类:

package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.catalyst.catalog.{CatalogDatabaseInMemoryCatalogSessionCatalog}
import org.apache.spark.sql.connector.catalog.CatalogManager

object GuandataSimpleAnalyzer extends Analyzer(
  new CatalogManager(
    FakeV2SessionCatalog,
    new SessionCatalog(
      new InMemoryCatalog,
      FunctionRegistry.builtin,
      TableFunctionRegistry.builtin
{
      override def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit = {}
    })) {
  override def resolverResolver = caseSensitiveResolution
}

让我们来使用这个简单Analyzer实现一下 SQL Parser吧:

public static void main(String[] args) {
    String sqlString = "select (count(distinct col1) + 12345) as c1 from (select cast(null as string) as col1) ";

    SparkSqlParser parser = new SparkSqlParser();

    LogicalPlan logicalPlan = parser.parsePlan(sqlString);

    LogicalPlan analyzedPlan = GuandataSimpleAnalyzer.executeAndCheck(logicalPlan, new QueryPlanningTracker());

    System.out.println(analyzedPlan.analyzed());
    System.out.println(analyzedPlan.toJSON());
}

运行程序, 屏幕上会有如下的输出:


我们可以在获得同样的结果前提下, 发现看到的日志非常少了, 速度也快了很多. 有了轻量的Spark SQL Parser, 我们可以围绕它做很多很cool的功能!