搜公众号
推荐 原创 视频 Java开发 开发工具 Python开发 Kotlin开发 Ruby开发 .NET开发 服务器运维 开放平台 架构师 大数据 云计算 人工智能 开发语言 其它开发 iOS开发 前端开发 JavaScript开发 Android开发 PHP开发 数据库
Lambda在线 > 飞鱼说编程 > 《Scala 语言》Scala 中的 Actor 编程

《Scala 语言》Scala 中的 Actor 编程

飞鱼说编程 2019-02-03
举报


1.了解 Scala Actor


注:Scala Actor 是 Scala 2.10.x 版本及以前版本的 Actor。
Scala 在 2.11.x 版本中将 Akka 加入其中,作为其默认的 Actor,老版本的 Actor 已经废弃。


1.1. 概念


Scala 中的 Actor 能够实现并行编程的强大功能,它是基于事件模型的并发机制,Scala 是运用消息 (message) 的发送、接收来实现多线程的。使用 Scala 能够更容易地实现多线程应用的开发。


1.2. 对比传统的 Java 并发编程和 Scala Actor 编程


《Scala 语言》Scala 中的 Actor 编程


对于 Java,我们都知道它的多线程实现需要对共享资源(变量、对象等)使用 synchronized 关键字进行代码块同步、对象锁互斥等等。而且,常常一大块的 try…catch 语句块中加上 wait 方法、notify 方法、notifyAll 方法是让人很头疼的。原因就在于 Java 中多数使用的是可变状态的对象资源,对这些资源进行共享来实现多线程编程的话,控制好资源竞争与防止对象状态被意外修改是非常重要的,而对象状态的不变性也是较难以保证的。 而在 Scala 中,我们可以通过复制不可变状态的资源 (即对象,Scala 中一切都是对象,连函数、方法也是) 的一个副本,再基于 Actor 的消息发送、接收机制进行并行编程。


1.3. Actor 方法执行顺序


(1) 首先调用 start() 方法启动 Actor;

(2) 调用 start() 方法后其 act() 方法会被执行;

(3) 向 Actor 发送消息。


1.4. 发送消息的方式


《Scala 语言》Scala 中的 Actor 编程


2.Actor 实战


2.1. 案例一


import scala.actors.Actor

object MyActor1 extends Actor{
 //定义act方法
 def act(){
   for(i <- 1 to 10){
     println("actor-1 " + i)
     Thread.sleep(2000)
   }
 }
}

object MyActor2 extends Actor{
 //定义act方法
 def act(){
   for(i <- 1 to 10){
     println("actor-2 " + i)
     Thread.sleep(2000)
   }
 }
}

object ActorExample1 extends App{
 //启动Actor
 MyActor1.start()
 MyActor2.start()
}


说明:上面分别调用了两个单例对象的 start() 方法,他们的 act() 方法会被执行,相同与在 Java 中开启了两个线程,线程的 run() 方法会被执行。

注意:这两个 Actor 是并行执行的,act() 方法中的 for 循环执行完成后 actor 程序就退出了。


2.2. 案例二


可以不断地接收消息。


import scala.actors.Actor

class ActorExample2 extends Actor {

 override def act(): Unit = {
   while (true) {
     receive {
       case "start" => {
         println("starting ...")
         Thread.sleep(5000)
         println("started")
       }
       case "stop" => {
         println("stopping ...")
         Thread.sleep(5000)
         println("stopped ...")
       }
     }
   }
 }
}

object ActorExample2 {
 def main(args: Array[String]) {
   val actor = new ActorExample2
   actor.start()
   //发送异步消息,感叹号就相当于是一个方法
   actor ! "start"
   actor ! "stop"
   println("消息发送完成!")
 }
}


说明:在 act() 方法中加入了while (true) 循环,就可以不停的接收消息。

注意:发送 start 消息和 stop 的消息是异步的,但是 Actor 接收到消息执行的过程是同步的按顺序执行。


2.3. 案例三


react 方式会复用线程,比 receive 更高效。


import scala.actors.Actor

class ActorExample3 extends Actor {

 override def act(): Unit = {
   loop {
     react {
       case "start" => {
         println("starting ...")
         Thread.sleep(5000)
         println("started")
       }
       case "stop" => {
         println("stopping ...")
         Thread.sleep(8000)
         println("stopped ...")
       }
     }
   }
 }
}

object ActorExample3 {
 def main(args: Array[String]) {
   val actor = new ActorExample3
   actor.start()
   actor ! "start"
   actor ! "stop"
   println("消息发送完成!")
 }
}


说明: react 如果要反复执行消息处理,react 外层要用 loop,不能用 while。


2.4. 案例四


结合 case class 发送消息。


import scala.actors.Actor

class ActorExample4 extends Actor {

 def act(): Unit = {
   while (true) {
     receive {
       case "start" => println("starting ...")
       case SyncMsg(id, msg) => {
         println(id + ",sync " + msg)
         Thread.sleep(5000)
         sender ! ReplyMsg(3,"finished")
       }
       case AsyncMsg(id, msg) => {
         println(id + ",async " + msg)
         Thread.sleep(5000)
       }
     }
   }
 }
}

object ActorExample4 {
 def main(args: Array[String]) {
   val a = new ActorExample4
   a.start()
   //异步消息
   a ! AsyncMsg(1, "hello actor")
   println("异步消息发送完成")
   //同步消息
   //val content = a.!?(1000, SyncMsg(2, "hello actor"))
   //println(content)
   val reply = a !! SyncMsg(2, "hello actor")
   println(reply.isSet)
   //println("123")
   val c = reply.apply()
   println(reply.isSet)
   println(c)
 }
}
case class SyncMsg(id : Int, msg: String)
case class AsyncMsg(id : Int, msg: String)
case class ReplyMsg(id : Int, msg: String)


2.5. 案例五


用 actor 并发编程写一个单机版的 WorldCount,将多个文件作为输入,计算完成后将多个任务汇总,得到最终的结果。


import java.io.File

import scala.actors.{Actor, Future}
import scala.collection.mutable
import scala.io.Source

/**
 * Created by ZX on 2016/4/4.
 */

class Task extends Actor {

 override def act(): Unit = {
   loop {
     react {
       case SubmitTask(fileName) => {
         val contents = Source.fromFile(new File(fileName)).mkString
         val arr = contents.split("\r\n")
         val result = arr.flatMap(_.split(" ")).map((_, 1)).groupBy(_._1).mapValues(_.length)
         //val result = arr.flatMap(_.split(" ")).map((_, 1)).groupBy(_._1).mapValues(_.foldLeft(0)(_ + _._2))
         sender ! ResultTask(result)
       }
       case StopTask => {
         exit()
       }
     }
   }
 }
}

object WorkCount {
 def main(args: Array[String]) {
   val files = Array("c://words.txt", "c://words.log")

   val replaySet = new mutable.HashSet[Future[Any]]
   val resultList = new mutable.ListBuffer[ResultTask]

   for(f <- files) {
     val t = new Task
     val replay = t.start() !! SubmitTask(f)
     replaySet += replay
   }

   while(replaySet.size > 0){
     val toCumpute = replaySet.filter(_.isSet)
     for(r <- toCumpute){
       val result = r.apply()
       resultList += result.asInstanceOf[ResultTask]
       replaySet.remove(r)
     }
     Thread.sleep(100)
   }
   val finalResult = resultList.map(_.result).flatten.groupBy(_._1).mapValues(x => x.foldLeft(0)(_ + _._2))
   println(finalResult)
 }
}

case class SubmitTask(fileName: String)
case object StopTask
case class ResultTask(result: Map[String, Int])


本文是在本人在学习 Scala 时的总结归纳和笔记,如果觉得对你有帮助,不要忘了点赞,评论,转发哟!!!

可以点击阅读原文查看博客原文哦!

上一篇:


版权声明:本站内容全部来自于腾讯微信公众号,属第三方自助推荐收录。《《Scala 语言》Scala 中的 Actor 编程》的版权归原作者「飞鱼说编程」所有,文章言论观点不代表Lambda在线的观点, Lambda在线不承担任何法律责任。如需删除可联系QQ:516101458

文章来源: 阅读原文

相关阅读

举报