vlambda博客
学习文章列表

Flink源码篇,作业提交流程、作业调度流程、作业内部转换流程图


Flink源码篇,作业提交流程、作业调度流程、作业内部转换流程图


1、Flink作业提交流程应该了解吧?

2、Flink作业提交分为几种方式?

3、Flink JobGraph是在什么时候生成的?

4、那在 JobGraph 提交集群之前都经历哪些过程?

5、PipeExecutor,它有哪些实现类?

6、Local提交模式有啥特点,怎么实现的?

7、远程提交模式都有哪些?

8、Standalone模式简单介绍一下?

9、yarn-session模式特点?

10、yarn-perJob模式特点?

11、yarn-application模式特点?

12、yarn-session 提交流程详细介绍一下?

13、yarn-perJob 提交流程详细介绍一下?

14、流图、作业图、执行图三者区别?

15、流图(StreamGraph)介绍一下?

16、作业图(JobGraph)介绍一下?

17、执行图(ExecutionGraph)介绍一下?

18、Flink调度器的概念介绍一下?

19、Flink调度行为包含几种?

20、Flink调度模式包含几种?

21、Flink调度策略包含几种?

22、Flink作业生命周期包含哪些状态?

23、Task的作业生命周期包含哪些状态?

24、Flink的任务调度流程讲解一下?

25、Flink的任务槽是什么意思?

26、Flink 槽共享又是什么意思?


Flink源码篇,作业提交流程、作业调度流程、作业内部转换流程图


1、Flink作业提交流程应该了解吧?


Flink的提交流程:


在Flink Client中,通过反射启动jar中的main函数,生成Flink StreamGraph和JobGraph,将JobGraph提交给Flink集群


Flink集群收到JobGraph(JobManager收到)后,将JobGraph翻译成ExecutionGraph,然后开始调度,启动成功之后开始消费数据。


总结来说:Flink核心执行流程,对用户API的调用可以转为 StreamGraph -> JobGraph -> ExecutionGraph。


2、Flink作业提交分为几种方式?


Flink的作业提交分为两种方式


Local 方式:即本地提交模式,直接在IDEA运行代码。


远程提交方式:分为Standalone方式、yarn方式、K8s方式


Yarn方式分为三种提交模式:Yarn-perJob模式、Yarn-Sessionmo模式、Yarn-Application模式


3、Flink JobGraph是在什么时候生成的?


StreamGraph、JobGraph全部是在Flink Client 客户端生成的,即提交集群之前生成,原理图如下:


Flink源码篇,作业提交流程、作业调度流程、作业内部转换流程图


4、那在 JobGraph 提交集群之前都经历哪些过程?


(1)用户通过启动Flink集群,使用命令行提交作业,运行 flink run -c WordCount xxx.jar


(2)运行命令行后,会通过run脚本调用CliFrontend入口,CliFrontend会触发用户提交的jar文件中的main方法,然后交给PipelineExecuteor # execute方法,最终根据提交的模式选择触发一个具体的PipelineExecutor执行。


(3)根据具体的PipelineExecutor执行,将对用户的代码进行编译生成streamGraph,经过优化后生成jobgraph。


Flink源码篇,作业提交流程、作业调度流程、作业内部转换流程图


5、PipeExecutor,它有哪些实现类?


PipeExecutor 在Flink中被叫做 流水线执行器,它是一个接口,是Flink Client生成JobGraph之后,将作业提交给集群的重要环节,前面说过,作业提交到集群有好几种方式,最常用的是yarn方式,yarn方式包含3种提交模式,主要使用 session模式,perjob模式,Application模式,jobGraph是在集群中生成。


所以PipeExecutor的实现类如下图所示:


Flink源码篇,作业提交流程、作业调度流程、作业内部转换流程图


除了上述框的两种模式外,在IDEA环境中运行Flink MiniCluster 进行调试时,使用LocalExecutor。

6、Local提交模式有啥特点,怎么实现的?


Local是在本地IDEA环境中运行的提交方式。不上集群。主要用于调试,原理图如下:

Flink源码篇,作业提交流程、作业调度流程、作业内部转换流程图


Flink程序由JobClient进行提交


JobClient将作业提交给JobManager


JobManager负责协调资源分配和作业执行。资源分配完成后,任务将提交给相应的TaskManager


TaskManager启动一个线程开始执行,TaskManager会向JobManager报告状态更改,如开始执行,正在进行或者已完成。


作业执行完成后,结果将发送回客户端。


源码分析:通过Flink1.12.2源码进行分析的


(1)创建获取对应的StreamExecutionEnvironment对象:LocalStreamEnvironment


调用StreamExecutionEnvironment对象的execute方法


Flink源码篇,作业提交流程、作业调度流程、作业内部转换流程图


(2)获取streamGraph


Flink源码篇,作业提交流程、作业调度流程、作业内部转换流程图


(3)执行具体的PipeLineExecutor -> 得到localExecutorFactory


Flink源码篇,作业提交流程、作业调度流程、作业内部转换流程图


(4)获取JobGraph

根据localExecutorFactory的实现类LocalExecutor生成JobGraph

Flink源码篇,作业提交流程、作业调度流程、作业内部转换流程图


上面这部分全部是在Flink Client生成的,由于是使用Local模式提交。所有接下来将创建MiniCluster集群,由miniCluster.submitJob指定要提交的jobGraph

(5)实例化MiniCluster集群

Flink源码篇,作业提交流程、作业调度流程、作业内部转换流程图


(6)返回JobClient 客户端


在上面执行miniCluster.submitJob 将JobGraph提交到本地集群后,会返回一个JobClient客户端,该JobClient包含了应用的一些详细信息,包括JobID,应用的状态等等。最后返回到代码执行的上一层,对应类为StreamExecutionEnvironment。


Flink源码篇,作业提交流程、作业调度流程、作业内部转换流程图


以上就是Local模式的源码执行过程。


7、远程提交模式都有哪些?


远程提交方式:分为Standalone方式、yarn方式、K8s方式


Standalone:包含session模式


Yarn方式分为三种提交模式:Yarn-perJob模式、Yarn-Sessionmo模式、Yarn-Application模式。


K8s方式:包含 session模式


8、Standalone模式简单介绍一下?


Standalone 模式为Flink集群的单机版提交方式,只使用一个节点进行提交,常用Session模式。


作业提交原理图如下:


Flink源码篇,作业提交流程、作业调度流程、作业内部转换流程图


提交命令如下:


bin/flink run org.apache.flink.WordCount xxx.jar


client客户端提交任务给JobManager


JobManager负责申请任务运行所需要的资源并管理任务和资源,


JobManager分发任务给TaskManager执行


TaskManager定期向JobManager汇报状态


9、yarn-session模式特点?


提交命令:


./bin/flink run -t yarn-session \ 

-Dyarn.application.id=application_XXXX_YY  xxx.jar



Yarn-Session模式:所有作业共享集群资源,隔离性差,JM负载瓶颈,main方法在客户端执行。适合执行时间短,频繁执行的短任务,集群中的所有作业 只有一个JobManager,另外,Job被随机分配给TaskManager


特点:


Session-Cluster模式需要先启动集群,然后再提交作业,接着会向yarn申请一块空间后,资源永远保持不变。如果资源满了,下一个作业就无法提交,只能等到 yarn中的其中一个作业执行完成后,释放了资源,下个作业才会正常提交。所有作业共享Dispatcher和ResourceManager;共享资源;适合规模小执行时间短的作业。


Flink源码篇,作业提交流程、作业调度流程、作业内部转换流程图


10、yarn-perJob模式特点?


提交命令:


./bin/flink run -t yarn-per-job --detached  xxx.jar


Yarn-Per-Job模式:每个作业单独启动集群,隔离性好,JM负载均衡,main方法在客户端执行。在per-job模式下,每个Job都有一个JobManager,每个TaskManager只有单个Job。


特点:


一个任务会对应一个Job,每提交一个作业会根据自身的情况,都会单独向yarn申请资源,直到作业执行完成,一个作业的失败与否并不会影响下一个作业的正常提交和运行。独享Dispatcher 和 ResourceManager,按需接受资源申请;适合规模大长时间运行的作业。


Flink源码篇,作业提交流程、作业调度流程、作业内部转换流程图


11、yarn-application模式特点?


提交命令:


./bin/flink run-application -t yarn-application xxx.jar


Yarn-Application模式:每个作业单独启动集群,隔离性好,JM负载均衡,main方法在JobManager上执行。


特点:


在yarn-per-job 和 yarn-session模式下,客户端都需要执行以下三步,即:


获取作业所需的依赖项;

通过执行环境分析并取得逻辑计划,即StreamGraph -> JobGraph;

将依赖项和JobGraph上传到集群中。


Flink源码篇,作业提交流程、作业调度流程、作业内部转换流程图


只有在这些都完成之后,才会通过env.execute()方法 触发 Flink运行时真正地开始执行作业。


如果所有用户都在同一个客户端上提交作业,较大的依赖会消耗更多的带宽,而较复杂的作业逻辑翻译成JobGraph也需要吃掉更多的CPU和内存,客户端的资源反而会成为瓶颈。


为了解决它,社区在传统部署模式的基础上实现了 Application模式。原本需要客户端做的三件事被转移到了JobManager里,也就是说main()方法在集群中执行(入口点位于 ApplicationClusterEntryPoint ),客户端只需要负责发起部署请求了


Flink源码篇,作业提交流程、作业调度流程、作业内部转换流程图


综上所述,Flink社区比较推荐使用 yarn-perjob 或者 yarn-application模式进行提交应用。

12、yarn-session 提交流程详细介绍一下?

提交流程图如下

Flink源码篇,作业提交流程、作业调度流程、作业内部转换流程图


1、启动集群


(1)Flink Client 向 Yarn ResourceManager提交任务信息。


Flink Client 将应用配置(Flink-conf.yaml、logback.xml、log4j.properties)和相关文件(Flink Jar、配置类文件、用户Jar文件、JobGraph对象等)上传至分布式存储HDFS中。


Flink Client 向 Yarn ResourceManager提交任务信息


(2)Yarn 启动 Flink集群,做2步操作:


通过Yarn Client 向Yarn ResourceManager提交Flink创建集群的申请,Yarn ResourceManager 分配 Container 资源,并通知对应的 NodeManager 上启动一个ApplicationMaster(每提交一个flink job 就会启动一个applicationMaster),ApplicationMaster会包含当前要启动的 JobManager和 Flink自己内部要使用的ResourceManager。


在 JobManager 进程中运行 YarnSessionClusterEntryPoint 作为集群启动的入口。


初始化Dispatcher,Flink自己内部要使用的ResourceManager,启动相关RPC服务,等待Flink Client 通过Rest接口提交JobGraph。


2、作业提交


(3)Flink Client 通过Rest 向Dispatcher 提交编译好的JobGraph。Dispatcher 是 Rest 接口,不负责实际的调度、指定工作。


(4)Dispatcher 收到 JobGraph 后,为作业创建一个JobMaster,将工作交给JobMaster,JobMaster负责作业调度,管理作业和Task的生命周期,构建ExecutionGraph(JobGraph的并行化版本,调度层最核心的数据结构)


以上两步执行完后,作业进入调度执行阶段。


3、作业调度执行


(5)JobMaster向ResourceManager申请资源,开始调度ExecutionGraph。


(6)ResourceManager将资源请求加入等待队列,通过心跳向YarnResourceManager申请新的Container来启动TaskManager进程。


(7)YarnResourceManager启动,然后从HDFS加载Jar文件等所需相关资源,在容器中启动TaskManager,TaskManager启动TaskExecutor


(8)TaskManager启动后,向ResourceManager 注册,并把自己的Slot资源情况汇报给ResourceManager。


(9)ResourceManager从等待队列取出Slot请求,向TaskManager确认资源可用情况,并告知TaskManager将Slot分配给哪个JobMaster。


(10)TaskManager向JobMaster回复自己的一个Slot属于你这个任务,JobMaser会将Slot缓存到SlotPool。


(11)JobMaster调度Task到TaskMnager的Slot上执行。


13、yarn-perJob 提交流程详细介绍一下?


提交命令如下:


./bin/flink run -t yarn-per-job --detached  xxx.jar


提交流程图如下所示:


Flink源码篇,作业提交流程、作业调度流程、作业内部转换流程图


1、启动集群


(1)Flink Client 向 Yarn ResourceManager提交任务信息。


Flink Client 将应用配置(Flink-conf.yaml、logback.xml、log4j.properties)和相关文件(Flink Jar、配置类文件、用户Jar文件、JobGraph对象等)上传至分布式存储HDFS中。


Flink Client 向 Yarn ResourceManager提交任务信息


(2)Yarn 启动 Flink集群,做2步操作:


通过Yarn Client 向Yarn ResourceManager提交Flink创建集群的申请,Yarn ResourceManager 分配 Container 资源,并通知对应的 NodeManager 上启动一个ApplicationMaster(每提交一个flink job 就会启动一个applicationMaster),ApplicationMaster会包含当前要启动的 JobManager和 Flink自己内部要使用的ResourceManager。


在 JobManager 进程中运行 YarnJobClusterEntryPoint 作为集群启动的入口。


初始化Dispatcher,Flink自己内部要使用的ResourceManager,启动相关RPC服务,等待Flink Client 通过Rest接口提交JobGraph。


2、作业提交


(3)ApplicationMaster启动Dispatcher,Dispatcher启动ResourceManager和JobMaster(该步和Session不同,JobMaster是由Dispatcher拉起,而不是Client传过来的)。


(4)JobMaster负责作业调度,管理作业和Task的生命周期,构建ExecutionGraph(JobGraph的并行化版本,调度层最核心的数据结构)


以上两步执行完后,作业进入调度执行阶段。


3、作业调度执行


(5)JobMaster向ResourceManager申请资源,开始调度ExecutionGraph。


(6)ResourceManager将资源请求加入等待队列,通过心跳向YarnResourceManager申请新的Container来启动TaskManager进程。


(7)YarnResourceManager启动,然后从HDFS加载Jar文件等所需相关资源,在容器中启动TaskManager,TaskManager启动TaskExecutor


(8)TaskManager启动后,向ResourceManager 注册,并把自己的Slot资源情况汇报给ResourceManager。


(9)ResourceManager从等待队列取出Slot请求,向TaskManager确认资源可用情况,并告知TaskManager将Slot分配给哪个JobMaster。


(10)TaskManager向JobMaster回复自己的一个Slot属于你这个任务,JobMaser会将Slot缓存到SlotPool。


(11)JobMaster调度Task到TaskMnager的Slot上执行。


14、流图、作业图、执行图三者区别?


Flink内部Graph总览图,由于现在Flink 实行流批一体代码,Batch API基本废弃,就不过多介绍在Flink DataStramAPI 中,Graph内部转换图如下:


Flink源码篇,作业提交流程、作业调度流程、作业内部转换流程图


以WordCount为例,流图、作业图、执行图、物理执行图之间的Task调度如下:

Flink源码篇,作业提交流程、作业调度流程、作业内部转换流程图


对于Flink 流计算应用,运行用户代码时,首先调用DataStream API ,将用户代码转换为 Transformation,然后经过:StreamGraph->JobGraph->ExecutionGraph 3层转换(这些都是Flink内置的数据结构),最后经过Flink调度执行,在Flink 集群中启动计算任务,形成一个物理执行图。


15、流图(StreamGraph)介绍一下?


流图 StreamGraph


Flink源码篇,作业提交流程、作业调度流程、作业内部转换流程图


流图StreamGraph 核心对象包括两个:StreamNode 点 和 StreamEdge 边


1)StreamNode 点


StreamNode 点 ,从 Transformation转换而来,可以简单理解为 StreamNode 表示一个算子,存在实体和虚拟,可以有多个输入和输出,实体StreamNode 最终变成物理算子,虚拟的附着在StreamEdge 边 上。


2)StreamEdge 边


StreamEdge 是 StreamGraph 的边,用来连接两个StreamNode 点,一个StreamEdge可以有多个出边、入边等信息。


16、作业图(JobGraph)介绍一下?


作业图 JobGraph


JobGraph是由StreamGraph优化而来,是通过OperationChain 机制将算子合并起来,在执行时,调度在同一个Task线程上,避免数据的跨线程,跨网络传递。


Flink源码篇,作业提交流程、作业调度流程、作业内部转换流程图


作业图JobGraph 核心对象包括三个:


JobVertex 点 、 JobEdge 边、IntermediateDataSet 中间数据集


1)JobVertex 点


经过算子融合优化后符合条件的多个StreamNode 可能会融合在一起生成一个 JobVertex,即一个JobVertex 包含一个或多个算子, JobVertex 的输入是 JobEdge. 输出是 IntermediateDataSet


2)JobEdge 边


JobEdge 表示 JobGraph 中的一 个数据流转通道, 其上游数据源是 IntermediateDataSet ,下游消费者是 JobVertex 。即数据通过 JobEdge 由IntermediateDataSet 传递给目标 JobVertex


JobEdge 中的数据分发模式,会直接影响执行时 Task 之间的数据连接关系,是点对点连接还是全连接。


3)IntermediateDataSet 中间数据集


中间数据集 IntermediateDataSet 是一种逻辑结构,用来表示 JobVertex 的输出,即经过算子处理产生的数据集。不同的执行模式下,其对应的结果分区类型不同,决 定了在执行时刻数据交换的模式。



17、执行图(ExecutionGraph)介绍一下?

执行图 ExecutionGraph


ExecutionGraph是调度Flink 作业执行的核心数据结构,包含了作业中所有并行执行的Task信息、Task之间的关联关系、数据流转关系。


StreamGraph 和JobGraph都在Flink Client生成,然后交给Flink集群。


JobGraph 到 ExecutionGraph 在JobMaster中完成,转换过程中重要变化如下:


加入了并行度的概念,成为真正可调度的图结构,生成了6个核心对象。


Flink源码篇,作业提交流程、作业调度流程、作业内部转换流程图


Flink源码篇,作业提交流程、作业调度流程、作业内部转换流程图


执行图 ExecutionGraph 核心对象包括6个:


ExecutionJobVertex、ExecutionVertex、IntermediateResult、IntermediateResultPartition、ExecutionEdge、Execution。


1)ExecutionJobVertex


该对象和 JobGraph 中的 JobVertex 一 一对应。该对象还包含一组 ExecutionVertex, 数量 与该 JobVertex 中所包含的StreamNode 的并行度一致,假设 StreamNode 的并行度为5 ,那么ExecutionJobVertex中也会包含 5个ExecutionVertex。


ExecutionJobVertex用来将一个JobVertex 封装成 ExecutionJobVertex,并依次创建 ExecutionVertex、Execution、IntermediateResult 和 IntermediateResultPartition,用于丰富ExecutionGraph。


2)ExecutionVertex


ExecutionJobVertex会对作业进行并行化处理,构造可以并行执行的实例,每一个并行执行的实例就是 ExecutionVertex。


3)IntermediateResult


IntermediateResult 又叫作中间结果集,该对象是个逻辑概念 表示 ExecutionJobVertex输出,和 JobGrap 中的IntermediateDalaSet 一 一对应,同样 一个ExecutionJobVertex 可以有多个中间结果,取决于当前 JobVertex 有几个出边(JobEdge)。


4)IntermediateResultPartition


IntermediateResultPartition 又叫作中间结果分区。表示一个 ExecutionVertex分区的输出结果,与 Execution Edge 相关联。


5)ExecutionEdge


表示ExecutionVertex 的输入,连接到上游产生的IntermediateResultPartition,一个Execution对应唯一的一个IntermediateResultPartition 和一个ExecutionVertex,一个ExecutionVertex 可以有多个ExecutionEdge。


6)Execution


ExecutionVertex 相当于每个 Task 的模板,在真正执行的时候,会将ExecutionVertex中的信息包装为一个Execution,执行一个ExecutionVertex的一次尝试。


当发生故障或者数据需要重算的情况下 ExecutionVertex 可能会有多个 ExecutionAttemptID。一个 Execution 通过 ExecutionAttemptID 来唯一标识。


JM和TM之间关于 task 的部署和 task status 的更新都是通过 ExecutionAttemptID 来确定消息接受者


小结,从这些基本概念中,也可以看出以下⼏点:


由于每个 JobVertex 可能有多个 IntermediateDataSet,所以每个ExecutionJobVertex 可能有多个 IntermediateResult,因此,每个ExecutionVertex 也可能会包含多个 IntermediateResultPartition;


ExecutionEdge 这里主要的作⽤是把 ExecutionVertex 和 IntermediateResultPartition 连接起来,表示它们之间的连接关系。


18、Flink调度器的概念介绍一下?


调度器是 Flink 作业执行的核心组件,管理作业执行的所有相关过程,包括JobGraph到ExecutionGraph的转换、作业生命周期管理(作业的发布、取消、停止)、作业的Task生命周期管理(Task的发布、取消、停止)、资源申请与释放、作业和Task的Faillover等。


调度器作用:


作业的生命周期管理,如作业的发布、挂起、取消


作业执行资源的申请、分配、释放


作业的状态管理,作业发布过程中的状态变化和作业异常时的FailOver等


作业的信息提供,对外提供作业的详细信息


调度有几个重要的组件:


调度器:SchedulerNG及其子类、实现类


调度策略:SchedulingStrategy及其实现类


调度模式:ScheduleMode包含流和批的调度,有各自不同的调度模式


19、Flink调度行为包含几种?


调度行为包含四种:


SchedulerStrategy接口定义了调度行为,其中包含4种行为:


Flink源码篇,作业提交流程、作业调度流程、作业内部转换流程图


1)startScheduling:调度入口,触发调度器的调度行为


(2)restartTasks:重启执行失败的Task,一般是Task执行异常导致的。


(3)onExecutionStateChange:当Execution状态发生改变时。


(4)onPartitionConsumable:当IntermediateResultPartition中的数据可以消费时。


20、Flink调度模式包含几种?


调度模式包含3种:Eager模式、分阶段模式(Lazy_From_Source)、分阶段Slot重用模式(Lazy_From_Sources_With_Batch_Slot_Request)。


1)Eager 调度


适用于流计算。一次性申请需要的所有资源,如果资源不足,则作业启动失败。


2)分阶段调度


LAZY_FROM_SOURCES 适用于批处理。从 SourceTask 开始分阶段调度,申请资源的时候,一次性申请本阶段所需要的所有资源。


上游 Task 执行完毕后开始调度执行下游的 Task,读取上游的数据,执行本阶段的计算任务,执行完毕之后,调度后一个阶段的 Task,依次进行调度,直到作业完成。


3)分阶段 Slot 重用调度


LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST 适用于批处理。与分阶段调度基本一样,区别在于该模式下使用批处理资源申请模式,可以在资源不足的情况下执行作业,但是需要确保在本阶段的作业执行中没有 Shuffle 行为。


目前视线中的 Eager 模式和 LAZY_FROM_SOURCES 模式的资源申请逻辑一样,LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST 是单独的资源申请逻辑。


21、Flink调度策略包含几种?


调度策略包含3种:


Flink源码篇,作业提交流程、作业调度流程、作业内部转换流程图


Flink源码篇,作业提交流程、作业调度流程、作业内部转换流程图


调度策略全部实现于调度器SchedulingStrategy,有三种实现:


EagerSchedulingStrategy:适用于流计算,同时调度所有的 task


LazyFromSourcesSchedulingStrategy:适用于批处理,当输入数据准备好时(上游处理完)进行 vertices 调度。


PipelinedRegionSchedulingStrategy:以流水线的局部为粒度进行调度


PipelinedRegionSchedulingStrategy 是 1.11 加入的,从 1.12 开始,将以 pipelined region为单位进行调度。


pipelined region 是一组流水线连接的任务。这意味着,对于包含多个 region的流作业,在开始部署任务之前,它不再等待所有任务获取 slot。取而代之的是,一旦任何region 获得了足够的任务 slot 就可以部署它。对于批处理作业,将不会为任务分配 slot,也不会单独部署任务。取而代之的是,一旦某个 region 获得了足够的 slot,则该任务将与所有其他任务一起部署在同一区域中


22、Flink作业生命周期包含哪些状态?


在Flink集群中,JobMaster 负责作业的生命周期管理,具体的管理行为在调度器和ExecutionGraph中实现。


作业的完整生命周期状态变换如下图所示:



(1)作业首先处于创建状态(created),然后切换到运行状态(running),并且在完成所有工作后,它将切换到完成状态(finished)。


(2)在失败的情况下,作业首先切换到失败状态(failing),取消所有正在运行任务。


如果所有节点都已达到最终状态,并且作业不可重新启动,则状态将转换为失败(failed)。


(3)如果作业可以重新启动,那么它将进入重新启动状态(restarting)。一旦完成重新启动,它将变成创建状态(created)。


(4)在用户取消作业的情况下,将进入取消状态(cancelling),会取消所有当前正在运行的任务。一旦所有运行的任务已经达到最终状态,该作业将转换到已取消状态(canceled)。


完成状态(finished),取消状态(canceled),失败状态(failed)表示一个全局的终结状态,并且触发清理工作,而暂停状态(suspended)仅处于本地终止状态。意味着作业的执行在相应的 JobManager 上终止,但集群的另一个 JobManager 可以从持久的HA存储中恢复这个作业并重新启动。因此,处于暂停状态的作业将不会被完全清理。



TaskManager 负责Task 的生命周期管理,并将状态的变化通知到JobMaster,在ExecutionGraph中跟踪Execution的状态变化,一个Execution对于一个Task。


Task的生命周期如下:共8种状态。



在执行 ExecutionGraph 期间,每个并行任务经过多个阶段,从创建(created)到完成(finished)或失败(failed) ,上图说明了它们之间的状态 和 可能的转换。任务可以执行多次(例如故障恢复)。


每个 Execution 跟踪一个 ExecutionVertex 的执行,每个 ExecutionVertex 都有一个当前 Execution(current execution)和一个前驱 Execution(prior execution)。



任务调度流程图如下


Flink源码篇,作业提交流程、作业调度流程、作业内部转换流程图


1、当Flink执行executor会自动根据程序代码生成DAG数据流图 ,即 Jobgraph;


2、ActorSystem创建Actor将数据流图发送给JobManager中的Actor;


3、JobManager会不断接收TaskManager的心跳消息,从而可以获取到有效的TaskManager;


4、JobManager通过调度器在TaskManager中调度执行Task(在Flink中,最小的调度单元就是task,对应就是一个线程);


5、在程序运行过程中,task与task之间是可以进行数据传输的


Job Client


主要职责是提交任务,提交后可以结束进程,也可以等待结果返回;


Job Client 不是 Flink 程序执行的内部部分,但它是任务执行的起点;


Job Client 负责接受用户的程序代码,然后创建数据流,将数据流提交给JobManager 以便进一步执行。执行完成后,Job Client 将结果返回给用户。


JobManager


主要职责是调度工作并协调任务做检查点;


集群中至少要有一个 master,master 负责调度 task,协调checkpoints 和 容错;


高可用设置的话可以有多个 master,但要保证一个是 leader,其他是stand by;


JobManager 包含 ActorSystem、Scheduler、CheckPoint三个重要的组件 ;


JobManager从客户端接收到任务以后,首先生成优化过的执行计划,再调度到TaskManager中执行。


TaskManager


主要职责是从JobManager处接收任务,并部署和启动任务,接收上游的数 据并处理


TaskManager 是在 JVM 中的一个或多个线程中执行任务的工作节点。


TaskManager在创建之初就设置好了Slot,每个Slot可以执行一个任务。



Flink源码篇,作业提交流程、作业调度流程、作业内部转换流程图


每个TaskManager是一个JVM的进程, 可以在不同的线程中执行一个或多个子任务。为了控制一个worker能接收多少个task。worker通过task slot来进行控制(一个worker 至少有一个task slot)。


1、任务槽


每个task slot表示TaskManager拥有资源的一个固定大小的子集。


一般来说:我们分配槽的个数都是和CPU的核数相等,比如8核,那么就分配8个槽。


Flink将进程的内存划分到多个slot中。


图中有2个TaskManager,每个TaskManager有3个slot,每个slot占有1/3的内存。


内存被划分到不同的slot之后可以获得如下好处:


TaskManager最多能同时并发执行的任务是可以控制的,那就是3个,因为不能超过slot的数量。任务槽的作用就是分离任务的托管内存,不会发生cpu隔离。


slot有独占的内存空间,这样在一个TaskManager中可以运行多个不同的作业,作业之间不受影响。


总结:task slot的个数代表TaskManager可以并行执行的task数。



Flink源码篇,作业提交流程、作业调度流程、作业内部转换流程图


2、槽共享


默认情况下,Flink允许子任务共享插槽,即使它们是不同任务的子任务,只要它们来自同一个作业。结果是一个槽可以保存作业的整个管道。允许插槽共享有两个主要好处:


只需计算Job中最高并行度(parallelism)的task slot。只要这个满足,其他的job也都能满足。


资源分配更加公平。如果有比较空闲的slot可以将更多的任务分配给它。图中若没有任务槽共享,负载不高的Source/Map等subtask将会占据许多资源,而负载较高的窗口subtask则会缺乏资源。


有了任务槽共享,可以将基本并行度(base parallelism)从2提升到6。提高了分槽资源的利用率。同时它还可以保障TaskManager给subtask的分配的slot方案更加公平。


 
   
   
 

end





  
    
    
  

Flink源码篇,作业提交流程、作业调度流程、作业内部转换流程图

  •  系列文章






  
    
    
  


  
    
    
  
点个赞+在看,少个 bug   👇