利用Akka并行执行SparkSQL任务
最近遇到了一个问题,spark执行多个SQL,这些SQL之间没有相互依赖关系,想并行提交这些SQL到集群中,也就是提高SparkSQL执行不同SQL的并行度。
最开始想到的是构造一个线程池,然后用多线程去执行SparkSQL,这个跟之前用到的Akka模型是一样的,而且Akka本身就是编写并发程序的框架,用Akka还能简化逻辑代码,将多线程并发问题完全交给框架来做,所以就用Akka实现了一个,具体demo代码如下:
import akka.actor.{ActorRef, ActorSystem, Props}
import akka.routing.RoundRobinPool
import org.apache.spark.sql.SparkSession
class Parallel {
// 构造akka
val actorSystem = ActorSystem("Master")
// 构造5个轮询路由线程的线程池
val master: ActorRef = actorSystem.actorOf(Props[Master]
.withRouter(new RoundRobinPool(5)), "master")
// 构造SparkSession对象
val spark: SparkSession = SparkSession.builder()
.appName("parallel_test")
.config("spark.sql.warehouse.dir", "/user/hive/warehouse")
.config("hive.metastore.uris", "thrift://xxx:9083")
.master("local[*]")
.enableHiveSupport()
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
// SparkSQL执行
def execute(sql: String): Unit = {
master ! (spark, sql)
}
}
object Parallel {
// 下面是5个SQL语句
val sql1: String =
"""
|(手动打码)
""".stripMargin
val sql2: String =
"""
|(手动打码)
""".stripMargin
.
.
.
val sql5: String =
"""
|(手动打码)
""".stripMargin
var sqls = List(
sql1
, sql2
, sql3
, sql4
, sql5
)
def main(args: Array[String]): Unit = {
val parallel = new Parallel()
for (elem <- sqls) {
parallel.execute(elem)
}
}
}
下面是sparkSQL 实际执行的Master类:
import akka.actor.Actor
import org.apache.spark.sql.SparkSession
class Master extends Actor {
override def receive: Receive = {
case message: (SparkSession, String) =>
println(Thread.currentThread().getName + "线程执行了" + message._1 + "执行的任务")
message._1.sql(message._2).show()
case _ =>
println("Nothing to do")
}
}
为了验证一下是否是多线程并行执行的,还在具体的代码执行段加了打印当前线程的名称:
测试代码我用了5个SQL语句,从打印的信息可以看出,确实是用了5个线程来执行这5个SQL的,当SQL比较多时,我们还可以根据系统的资源来合理设定线程池内的线程数,这样做不仅可以实现并行提交SparkSQL任务,还大大的简化了我们的编程逻辑。
往期回顾