vlambda博客
学习文章列表

使用akka框架和scala语言编写简单的RPC通信案例

前言

1)akka框架是一个并发的、分布式的、可伸缩性的、高性能的RPC通信框架,大数据开发框架Spark、flink底层原理中或多或少都用到了

2)scala语言真的很强大、好用、方便,结合了面向对象语言和函数式语言的特点

akka的原理图

大多数分布式框架或工具 都遵循着主从节点的架构设计,在这里我们暂不考虑高可用的模式(高可用可参考文章)

每个机器上的一个进程中只存在着1个通信角色对象  ActorSystem  ,也就是说 ActorSystem 对象的示例只有一个,但由它创建的Master和Worker可以有多个,是多例

1)启动master   内部定时器定期检查有无超时连接(就是在一定时间内没有向我发送心跳的worker),并将失效的进行移除

2)启动worker,跟master建立网络连接,将自己的信息(workerid,内存,内核数cpu等信息)发给master进行注册

3)master收到注册信息,将注册的信息进行保存到内存(高效),也可以持久化到磁盘或zookeeper当中(数据安全),之后向worker发送注册成功的信息

4)worker收到master发来的注册成功的信息,很高兴,并启动定时器,定期发送心跳,向master报活

代码实现

Worker类代码:

import java.util.UUIDimport java.util.concurrent.TimeUnit
import akka.actor.{Actor, ActorSelection, ActorSystem, Props}import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
/** * @author:tom * @Date:Created in 16:49 2020/12/18 */class Worker extends Actor {

var masterRef: ActorSelection = _
var workerId = UUID.randomUUID().toString
//在执行构造函数(实例化对象)之后、receive方法执行之前一定会执行一次 override def preStart(): Unit = {
//向master 进行注册信息 //可以与master建立连接 masterRef = context.actorSelection("akka.tcp://MasterActorSystem@localhost:8888/user/MasterActor") //发送消息 masterRef ! RegisterWorker(workerId, "2048", 4) }
override def receive: Receive = { //自己给自己发送的周期消息 case SendHeartbeat => { // if () { // // } 向Master发送心跳 masterRef ! HeartBeat(workerId) }
case RegisteredWorker => { // println("a response from master")
//启动一个定时器 import context.dispatcher context.system.scheduler.schedule(Duration(0, TimeUnit.MILLISECONDS), 10000.millisecond, self, SendHeartbeat)
} }}
object Worker {
def main(args: Array[String]): Unit = { val host = "localhost" val port = 9999 val configStr = s""" |akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname = $host |akka.remote.netty.tcp.port = $port |""".stripMargin val config = ConfigFactory.parseString(configStr) //创建workerActorSystem val workerActorSystem = ActorSystem.apply("workerActorSystem", config) //创建workerActor val workerActor = workerActorSystem.actorOf(Props(new Worker), "WorkerActor") }

}

Master代码:

import akka.actor.{Actor, ActorSystem, Props}import com.typesafe.config.ConfigFactory
import scala.collection.mutableimport scala.concurrent.duration._
/** * @author:tom * @Date:Created in 16:08 2020/12/18 */class Master extends Actor {
//定义一个可变的HashMap集合用来存储worker的信息 val idToWorker = new mutable.HashMap[String, WorkerInfo]()

//master定期检查自己 是否有新的节点(worker出现) override def preStart(): Unit = { import context.dispatcher context.system.scheduler.schedule(0 millisecond, 15000.millisecond, self, CheckTimeOutWorker) }
//用来接收消息 override def receive: Receive = {
//模式匹配 case "hello" => { println("hello~") } case "hi" => { println("hi~") }
//定时检查 case CheckTimeOutWorker => { val deadWorkers = idToWorker.values.filter(w => System.currentTimeMillis() - w.lastHeartbeatTime > 30000) deadWorkers.foreach(dw => { idToWorker -= dw.workerId }) println(s"current alive worker size:${idToWorker.size}") }
//有worker来进行注册信息需要执行的逻辑 case RegisterWorker(workerId, memory, cores) => { // println(s"workerId:$workerId,memory:$memory,cores:$cores")
//worker 注册成功应该执行的逻辑
//将信息存入到内存集合当中 val workerInfo: WorkerInfo = new WorkerInfo(workerId, memory, cores) idToWorker.put(workerId, workerInfo) //返回一个注册成功的信息 sender() ! RegisteredWorker }
//worker端发送过来的心跳信息 case HeartBeat(workerId) => { //根据workerId到Map中查找对应的WorkerInfo if (idToWorker.contains(workerId)) { //如果存在 则取出信息 val workerInfo = idToWorker(workerId) //更新上一次的心跳时间 workerInfo.lastHeartbeatTime = System.currentTimeMillis() } }
}}
object Master {
def main(args: Array[String]): Unit = { val host = "localhost" val port = 8888 val configStr = s""" |akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname = $host |akka.remote.netty.tcp.port = $port |""".stripMargin val config = ConfigFactory.parseString(configStr) //创建一个ActorSystem实例(单例) val masterActorSystem = ActorSystem("MasterActorSystem", config) //创建一个Actor val actor = masterActorSystem.actorOf(Props[Master], "MasterActor") //自己给自己发消息 actor ! "hello" }

}