vlambda博客
学习文章列表

利用Akka并行执行SparkSQL任务

最近遇到了一个问题,spark执行多个SQL,这些SQL之间没有相互依赖关系,想并行提交这些SQL到集群中,也就是提高SparkSQL执行不同SQL的并行度。

最开始想到的是构造一个线程池,然后用多线程去执行SparkSQL,这个跟之前用到的Akka模型是一样的,而且Akka本身就是编写并发程序的框架,用Akka还能简化逻辑代码,将多线程并发问题完全交给框架来做,所以就用Akka实现了一个,具体demo代码如下:

import akka.actor.{ActorRef, ActorSystem, Props}import akka.routing.RoundRobinPoolimport org.apache.spark.sql.SparkSession
class Parallel {  //  构造akka val actorSystem = ActorSystem("Master")
// 构造5个轮询路由线程的线程池 val master: ActorRef = actorSystem.actorOf(Props[Master] .withRouter(new RoundRobinPool(5)), "master")
// 构造SparkSession对象 val spark: SparkSession = SparkSession.builder() .appName("parallel_test") .config("spark.sql.warehouse.dir", "/user/hive/warehouse") .config("hive.metastore.uris", "thrift://xxx:9083") .master("local[*]") .enableHiveSupport() .getOrCreate()  spark.sparkContext.setLogLevel("ERROR")
// SparkSQL执行 def execute(sql: String): Unit = { master ! (spark, sql)  }
}

object Parallel {
// 下面是5个SQL语句 val sql1: String = """      |(手动打码) """.stripMargin
val sql2: String = """ |(手动打码) """.stripMargin . . . val sql5: String = """ |(手动打码) """.stripMargin
  var sqls = List(    sql1 , sql2    , sql3    , sql4    , sql5 )
def main(args: Array[String]): Unit = { val parallel = new Parallel() for (elem <- sqls) { parallel.execute(elem) }
}}


下面是sparkSQL 实际执行的Master类:

import akka.actor.Actorimport org.apache.spark.sql.SparkSession
class Master extends Actor {
override def receive: Receive = {    case message: (SparkSession, String) => println(Thread.currentThread().getName + "线程执行了" + message._1 + "执行的任务")      message._1.sql(message._2).show() case _ => println("Nothing to do") }}


为了验证一下是否是多线程并行执行的,还在具体的代码执行段加了打印当前线程的名称:


测试代码我用了5个SQL语句,从打印的信息可以看出,确实是用了5个线程来执行这5个SQL的,当SQL比较多时,我们还可以根据系统的资源来合理设定线程池内的线程数,这样做不仅可以实现并行提交SparkSQL任务,还大大的简化了我们的编程逻辑。


往期回顾