Scala并发框架Akka原理详解
“ Scala并发框架Akka原理详解。”
1 什么是Akka
Akka是JAVA虚拟机平台上构建高并发、分布式和容错应用的工具包和运行时。Akka用Scala语言编写,同时提供了Scala和Java的开发接口。
Akka处理并发的方式,基于Actor模型实现。
Actor的概念来自于Erlang,在AKKA中,可以认为一个Actor就是一个容器,用以存储状态、行为、Mailbox以及子Actor与Supervisor策略。
Actor之间并不直接通信,而是通过Mail来互通有无。
在Actor模式中,每个Actor都有一个(恰好一个)Mailbox。
Mailbox相当于是一个小型的队列,一旦Sender发送消息,就是将该消息入队到Mailbox中。
入队的顺序按照消息发送的时间顺序。Mailbox有多种实现,默认为FIFO。但也可以根据优先级考虑出队顺序,实现算法则不相同。
akka框架由如下几部分组成:
akka-actors
akka的核心,一个用于并发和分发的模型,没有线程原语的所有痛苦
akka-stream
一种直观而安全的方式来实现异步、非阻塞的回压流处理。
akka-http
现代的、快速的、异步的、流的HTTP服务器和客户端。
akka-cluster
通过在多个节点上分布您的系统来获得弹性和弹性。
akka-sharding
根据用户的身份,在集群中分配您的参与者。
Distributed Data
最终一致,高度读取和写入可用,低延迟数据
Akka Persistence
为参与者的事件包允许他们在重新启动后到达相同的状态。
Akka Management
在云系统上运行Akka系统的扩展(k8s,aws,…)
Alpakka
Akka流连接器用于集成其他技术
2 Akka特点
对并发模型进行了更高的抽象
是异步、非阻塞、高性能的事件驱动编程模型
是轻量级事件处理(1GB内存可容纳百万级别个Actor)
它提供了一种称为Actor的并发模型,其粒度比线程更小,你可以在系统中启用大量的Actor。
它提供了一套容错机制,允许在Actor出现异常时进行一些恢复或重置操作。
Akka既可以在单机上构建高并发程序,也可以在网络中构建分布式程序,并提供位置透明的Actor定位服务。
3 Actor模型
在并发程序中线程是并发程序的基本执行单元,但在Akka中执行单元是Actor。Actor模型是1973年提出的一个分布式并发编程模式,在Erlang语言中得到广泛支持和应用。
在Actor模型中并不是通过Actor对象的某个方法来告诉Actor需要做什么,而是给Actor发送一条消息。当一个Actor收到消息后,它有可能根据消息的内容做出某些行为,如更改自身状态,此时这个状态的更改是Actor自身进行的,并非由外界干预进行的。
在Erlang中,每段代码都运行在进程中,进程是Erlang中对Actor的称呼,意味着它的状态不会影响其他进程。系统中会有一个supervisor,实际上它只是另一个进程。被监控的进程挂掉了,supervisor会被通知并对此进行处理,因此也就能创建一个具有自愈功能的系统。如果一个Actor到达异常状态并且崩溃,无论如何,supervisor都可以做出反应并尝试把它变成一致状态,最常见的方式就是根据初始状态重启Actor。
简单来说,Actor通过消息传递的方式与外界通信,而且消息传递是异步的。
每个Actor都有一个邮箱,邮箱接收并缓存其他Actor发过来的消息,通过邮箱队列mail queue来处理消息。
Actor一次只能同步处理一个消息,处理消息过程中,除了可以接收消息外不能做任何其他操作。
每个Actor是完全独立的,可以同时执行他们的操作。每个Actor是一个计算实体,映射接收到的消息并执行以下动作:发送有限个消息给其他Actor、创建有限个新的Actor、为下一个接收的消息指定行为。这三个动作没有固定的顺序,可以并发地执行,Actor会根据接收到的消息进行不同的处理。
3.1 Actor模型的优势
传统并发程序是基于面向对象的方法,通过对象的方法调用进行信息传递,
如果对象的方法修改对象本身的状态,在多线程下就可能出现对象状态的不一致,
此时就必须对方法调用进行同步,而同步操作会牺牲性能。
例如,两个线程同时尝试购买最后一件商品时,如果没有锁就可能出现多个线程同时断定计数器的值大于或等于购买数量,然后错误地递减计数器,从而导致出现负数,出现线程安全问题。
为了防止线程安全问题,就需要用到锁。
以使用Java轻量级锁为例,在高度竞争的阶段,很有可能出现很长的线程队列,他们都在等待递减计数器。但使用队列的方式的问题在于可能造成众多阻塞线程,也就是每个线程都在等待轮到它们去执行一个序列化的操作。
所以,不合理的使用锁,很可能将多核多线程的应用,变成单线程的应用,或者导致工作线程之间存在高度竞争。
Actor模型优雅的解决了这个难题,为真正多线程的应用提供了一个基础支持。
3.2 Actor角色的职责
Actor模型把系统中所有事物都抽象成为一个Actor角色,在一个系统中可以将一个大规模的任务分解为一些小任务,这些小任务可以由多个Actor并发处理,从而减少任务的完成时间。
为响应一个接收到的消息,一个actor能够自己做出一些决策,如创建更多的actor,或发送更多的消息,或者确定如何去响应接收到的下一个消息。
Actor角色的职责:
Actor的输入是接收到的消息
Actor接收到消息决定如何处理:如创建更多的actor,或发送更多的消息,或者确定如何去响应接收到的下一个消息
Actor处理完成任务后可以发送消息给其它Actor
Actor模型的一个好处是可以消除共享状态:
Actor每次只能处理一条消息,所以Actor内部可以安全的处理状态,而不用考虑锁机制。
在Actor模型中主角是actor,类似一种worker。Actor彼此之间直接发送消息,不需要经过什么中介,消息是异步发送和处理的。在Actor模型中一切都是Actor,所有逻辑或模块都可以看成是Actor,通过不同Actor之间的消息传递实现模块之间的通信和交互。
Actor是由状态(state)、行为(behavior)、邮箱(mailbox)三者组成的。
状态(state):状态是指actor对象的变量信息,状态由actor自身管理,避免并发环境下的锁和内存原子性等问题。
行为(behavior):行为指定的是actor中计算逻辑,通过actor接收到的消息来改变actor的状态。
邮箱(mailbox):邮箱是actor之间的通信桥梁,邮箱内部通过FIFO消息队列来存储发送发消息,而接收方则从邮箱中获取消息。
3.3 Mailbox角色的职责
消息是异步的传送到actor的,所以当actor正在处理消息时,新来的消息应该存储到别的地方,也就是mailbox消息存储的地方。
每个actor都有且仅有一个mailbox,mailbox相当于一个小型的队列,一旦sender发送消息,就将该消息入队到mailbox中。入队的顺序按照消息发送的时间顺序。
)
3.3 Actor模型的特点
Actor模型描述了一组为避免并发编程的公理:
所有的Actor状态是本地的,外部是无法访问的。
Actor必须通过消息传递进行通信。
一个Actor可以响应消息、退出新Actor、改变内部状态、将消息发送到一个或多个Actor。
Actor可能会堵塞自己但Actor不应该堵塞自己运行的线程
第一行打印了HelloWorld Actor的路径,它是系统内第一个被创建的Actor。路径为:akka://hello/user/helloworld。第一个hello表示ActorSystem的系统名称,即构造时第一个入参。user表示用户Actor,所有的用户Actor都会挂载在user路径下。最后helloworld是这个Actor的名字。
第二行打印了Greeter Actor的路径,三、四行为Greeter中输出的信息。
第五行表示系统遇到了一条消息投递失败,原因是HelloWorld将自身停止了,导致Greeter发送的信息无法成功投递。
当使用Actor进行并发开发时,关注点已经不在线程上了,线程调度已经被Akka框架进行封装,只需关注Actor对象即可。Actor对象之间的通过显示的消息发送来传递信息。
当系统内有多个Actor时,Akka会自动在线程池中选择线程来执行我们的Actor,不同的Actor可能会被同一个线程执行或者一个Actor可能被不同的线程执行。
注意:不要在一个Actor中执行耗时的代码,这样可能会导致其他Actor的调度出现问题。
4 为什么Java架构师需要学习Akka
Akka用Scala语言编写,虽然提供了Java的开发接口,但是基于Akka开发的非常少。但是,很多分布式框架都是用akka做的,比如flink的分布式通信就依赖Akka。
但是对于应用级团队,性能总是能满足需求即可,而不需要追求性能极限。中间件越能reliable,那么开发则越省心越快。应用开发团队当然更喜欢使用分布式中间件,比如Hadoop,spark,hive,flink,Kinesis,Kafka,Storm等组件来解决问题.
所以,Akka对于应用开发来说, 可以不用学习。但是对于架构师来说, 是一定需要学习的。
至少,Akka的原理非常值得学习。
5 Akka入门例子
以下程序演示了akka的一个简单的示例。创建Actor去处理一条命令,通过消息传递的方式进行交互。
引入依赖:
<dependency >
<groupId> com.typesafe.akka</groupId >
<artifactId>akka-actor_2.10</artifactId>
<version>2.3.10</version>
</dependency>
<dependency >
<groupId> com.typesafe.akka</groupId >
<artifactId>akka-persistence-experimental_2.10</artifactId>
<version>2.3.10</version>
</dependency>
创建命令对象
//创建命令对象
static class Command implements Serializable
{
private static final long serialVersionUID = 1L;
private String data;
}
创建Actor对象
//创建Actor对象
static class SimpleActor extends UntypedActor
{
LoggingAdapter log = Logging.getLogger(getContext().system(), this);
public SimpleActor()
{
log.info("SimpleActor constructor");
}
public void onReceive(Object msg) throws Exception
{
log.info("Received Command: " + msg);
if (msg instanceof Command)
{
final String data = ((Command) msg).getData();
// emmit an event somewhere...
} else if (msg.equals("echo"))
{
log.info("ECHO!");
}
}
}
启动ActorSystem
public static void main(String[] args) throws InterruptedException
{
final ActorSystem actorSystem = ActorSystem.create("actor-system");
Thread.sleep(5000);
final ActorRef actorRef = actorSystem.actorOf(Props.create(SimpleActor.class), "simple-actor");
actorRef.tell(new Command("CMD 1"), null);
actorRef.tell(new Command("CMD 2"), null);
actorRef.tell(new Command("CMD 3"), null);
actorRef.tell(new Command("CMD 4"), null);
actorRef.tell(new Command("CMD 5"), null);
Thread.sleep(5000);
log.debug("Actor System Shutdown Starting...");
actorSystem.shutdown();
}
运行main 结果如下:
[11/01/2020 18:12:15.303] [actor-system-akka.actor.default-dispatcher-3] [akka://actor-system/user/simple-actor] SimpleActor constructor ] [
[11/01/2020 18:12:15.306] [actor-system-akka.actor.default-dispatcher-3] [akka://actor-system/user/simple-actor] Received Command: AkkaDemo.Command(data=CMD 1) ] [
[11/01/2020 18:12:15.306] [actor-system-akka.actor.default-dispatcher-3] [akka://actor-system/user/simple-actor] Received Command: AkkaDemo.Command(data=CMD 2) ] [
[11/01/2020 18:12:15.307] [actor-system-akka.actor.default-dispatcher-3] [akka://actor-system/user/simple-actor] Received Command: AkkaDemo.Command(data=CMD 3) ] [
[11/01/2020 18:12:15.307] [actor-system-akka.actor.default-dispatcher-3] [akka://actor-system/user/simple-actor] Received Command: AkkaDemo.Command(data=CMD 4) ] [
[11/01/2020 18:12:15.308] [actor-system-akka.actor.default-dispatcher-3] [akka://actor-system/user/simple-actor] Received Command: AkkaDemo.Command(data=CMD 5) ] [
6 Akka最佳实践
素数计算
需求:使用多线程找出1000000以内素数个数
共享内存方式的处理流程如下:
传统方式通过锁/同步的方式实现并发,每次同步获取当前值并让一个线程去判断值是否为素数,若是的话则通过同步方式对计数器加一。
Actor模型方式的处理流程如下:
使用Actor模型方式会将此过程拆分成多个模块,即拆分成多个Actor。每个Actor负责不同部分,并通过消息传递让多个Actor协同工作。
银行转账
存在的问题:当用户A Actor扣款期间,用户B Actor是不受限的,此时对用户B Actor进行操作是合法的,针对这种情况,单纯的Actor模型就显得比较乏力,需要加入其他机制来保证一致性。
说明:以上案例,仅仅作为模式的参考,并没有提供参考实现
7 Akka的消息投递策略
Akka应用是由消息驱动的,消息是除Actor之外最重要的核心组件。在Actor之间传递的消息应该满足不变性,即不变模式,可变的消息无法高效的在并发环境中使用。在Akka中推荐使用scala不可变对象。在java代码中可以使用final字段声明,在消息构造完成后,就不能再发生改变了。
Akka的消息投递策略:
至多一次投递:此策略中每一条消息最多会被投递一次,可能会偶尔出现投递失败的情况,从而导致消息丢失。此策略高性能。
至少一次投递:此策略中每一条消息至少会被投递一次,直至成功。在一些偶然的情况,接收者可能会收到重复消息,但消息不会丢失。此策略需要保存消息投递的状态并不断重试。
精确投递:所有消息保证被精确的投递并成功接收一次,既不会丢失也不会重复。此策略成本最高且不易实现。
关于消息的可靠性:没有必要在Akka层保证消息的可靠性,这样做成本太高且无必要。消息可靠性应该在应用的业务层进行保证,有时丢失一些消息是符合应用要求的。
消息投递的顺序性:Akka可以保证在一定程度上的投递顺序性。如Actor A1向A2顺序发送了M1、M2、M3三条消息,Actor A3向A2顺序发送了M4、M5、M6三条消息,则系统可以保证:
如果M1无丢失,它一定先于M2、M3被A2收到。
如果M2无丢失,它一定先于M3被A2收到。
如果M4无丢失,它一定先于M5、M6被A2收到。
如果M5无丢失,它一定先于M6被A2收到。
对于A2来说,来自A1向A3的消息并没有顺序性保证。
另外这种消息投递规则不具备可传递性,如下图:
C收到M1和M2的顺序是没有保证的。
推荐阅读
(点击标题可跳转阅读)
1 |
2 |
3 |
4 |
5 |
6 |
7 |
8 |
9 |
10 |
扫码二维码
获取更多精彩
长按图片关注
点个
在看
你最好看