利用Akka并行执行SparkSQL任务
最近遇到了一个问题,spark执行多个SQL,这些SQL之间没有相互依赖关系,想并行提交这些SQL到集群中,也就是提高SparkSQL执行不同SQL的并行度。
最开始想到的是构造一个线程池,然后用多线程去执行SparkSQL,这个跟之前用到的Akka模型是一样的,而且Akka本身就是编写并发程序的框架,用Akka还能简化逻辑代码,将多线程并发问题完全交给框架来做,所以就用Akka实现了一个,具体demo代码如下:
import akka.actor.{ActorRef, ActorSystem, Props}import akka.routing.RoundRobinPoolimport org.apache.spark.sql.SparkSessionclass Parallel {// 构造akkaval actorSystem = ActorSystem("Master")// 构造5个轮询路由线程的线程池val master: ActorRef = actorSystem.actorOf(Props[Master].withRouter(new RoundRobinPool(5)), "master")// 构造SparkSession对象val spark: SparkSession = SparkSession.builder().appName("parallel_test").config("spark.sql.warehouse.dir", "/user/hive/warehouse").config("hive.metastore.uris", "thrift://xxx:9083").master("local[*]").enableHiveSupport().getOrCreate()spark.sparkContext.setLogLevel("ERROR")// SparkSQL执行def execute(sql: String): Unit = {master ! (spark, sql)}}object Parallel {// 下面是5个SQL语句val sql1: String ="""|(手动打码)""".stripMarginval sql2: String ="""|(手动打码)""".stripMargin...val sql5: String ="""|(手动打码)""".stripMarginvar sqls = List(sql1, sql2, sql3, sql4, sql5)def main(args: Array[String]): Unit = {val parallel = new Parallel()for (elem <- sqls) {parallel.execute(elem)}}}
下面是sparkSQL 实际执行的Master类:
import akka.actor.Actorimport org.apache.spark.sql.SparkSessionclass Master extends Actor {override def receive: Receive = {case message: (SparkSession, String) =>println(Thread.currentThread().getName + "线程执行了" + message._1 + "执行的任务")message._1.sql(message._2).show()case _ =>println("Nothing to do")}}
为了验证一下是否是多线程并行执行的,还在具体的代码执行段加了打印当前线程的名称:
测试代码我用了5个SQL语句,从打印的信息可以看出,确实是用了5个线程来执行这5个SQL的,当SQL比较多时,我们还可以根据系统的资源来合理设定线程池内的线程数,这样做不仅可以实现并行提交SparkSQL任务,还大大的简化了我们的编程逻辑。
往期回顾
