擎创技术流 | Flink源码分析-JobDispatcher
背景介绍
最近一直在阅读Flink基于Yarn的资源管理相关的代码,牵扯的流程比较长,主要包含以下几个环节:
客户端环节:命令参数解析,定位到作业入口,生成JobGraph,翻译成启动对应的Yarn集群描述符,开始提交到Yarn。
AppMaster环节:根据不同的作业模式(Session/Job),选择不同的启动入口,开始启动集群,这个阶段需要构造的重要组件包含:Dispatcher,Resource Manager,Job Master。具体每个组件的作用 以及 每个组件的启动细节我们后面会进行分析。
Executor环节:作业启动后,根据需要的资源,通过Resource Manager调用Yarn客户端启动Task Executor,其中Task Executor的注册细节前文中已经详细介绍过。
其中第二个环节因为需要适配不同的资源管理框架,不同的作业模式以及不同的高可用实现方式,所以涉及到了非常多的组件,代码的结构也比较复杂,因此接下来我们针对第二个环节进行仔细的分析,本文主要来解构Dispatcher,为了做到完全的资源隔离,我日常中用的几乎全都是Job Cluster模式,因此我们就以该模式下的Dispatcher做为分析目标。
这一次,我不打算采取"先跳到代码去进行分析,然后总结的方法"。主要是因为代码的调用关系比较复杂,直接去做调用链分析的话,容易被绕晕,而且最终也就只能只见树木不见森林。我省略了一些不太重要的枝节,把Yarn AppMaster启动到Dispatcher启动这个过程总结了一张图:
基础服务启动
构造ActorSystem,监听JobManager所设定的ip以及端口;构造HighAvailability服务;构造Heartbeat服务;启动JMX;启动Blob服务。
ResourceManager构造和启动
通过YarnResourceManagerDriver实现了针对Yarn资源的申请和释放等资源操作。
JobDispatcherRunner构造、Leader选举 以及 构造启动Dispatcher
只有当获得Leader角色的节点,才会去触发构造启动Dispatcher。JobCluster与Session模式所实现的功能不同,Session模式下需要去监听JobGraph的变化,进而去创建或者停止Job,JobCluster模式下则不需要,因此在DispatcherRunner以及Dispatcher中间加了DispatcherLeaderProcess来针对Job与Session模式做了不同实现。
基础服务以及 Resource Manager这两个组件是比较容易理解的,其功能以及实现方法也都比较直接,大家可以直接阅读YarnJobClusterEntrypoint,YarnResourceManagerDriver,ActiveResoureManager这几个文件就可以了解其调用关系,本文不再额外做介绍。Dispatcher这个部份牵扯到了DispatcherRunner,DispatcherLeaderProcess以及Dispatcher以及每个组件分别对应的Factory(甚至FactoryFactory),实现也比较绕,不太容易去揣测这样设计的目的,也是我们接下来需要去重点分析的部份。
首先我们先来看看Dispatcher这个组件的功能,网上没有找到比较合适的针对Dispatcher的设计文档,最接近的是FLIP-6 ,但是对照代码来看,这个文档也已经过期了,与代码并不一致,因此本文中的推断都是来自于代码,Dispatcher对外提供的服务接口定义在DispatcherGateway中。
DispatcherGateway
/** Gateway for the Dispatcher component. */public interface DispatcherGateway extends FencedRpcGateway<DispatcherId>, RestfulGateway {/*** Submit a job to the dispatcher.** @param jobGraph JobGraph to submit* @param timeout RPC timeout* @return A future acknowledge if the submission succeeded*/CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, @RpcTimeout Time timeout);/*** List the current set of submitted jobs.** @param timeout RPC timeout* @return A future collection of currently submitted jobs*/CompletableFuture<Collection<JobID>> listJobs(@RpcTimeout Time timeout);/*** Returns the port of the blob server.** @param timeout of the operation* @return A future integer of the blob server port*/CompletableFuture<Integer> getBlobServerPort(@RpcTimeout Time timeout);/*** Requests the {@link ArchivedExecutionGraph} for the given jobId. If there is no such graph,* then the future is completed with a {@link FlinkJobNotFoundException}.** <p>Note: We enforce that the returned future contains a {@link ArchivedExecutionGraph} unlike* the super interface.** @param jobId identifying the job whose AccessExecutionGraph is requested* @param timeout for the asynchronous operation* @return Future containing the AccessExecutionGraph for the given jobId, otherwise {@link* FlinkJobNotFoundException}*/@OverrideCompletableFuture<ArchivedExecutionGraph> requestJob(JobID jobId, @RpcTimeout Time timeout);default CompletableFuture<Acknowledge> shutDownCluster(ApplicationStatus applicationStatus) {return shutDownCluster();}}
从接口定义来看,Dispatcher主要负责提交Job,查询Job列表以及停止集群,也就是说如果建立了到Dispatcher的连接,就可以利用Dispatcher去提交Job,可以基于Dispatcher构建一个对外的HTTP服务,然后对外提供作业的提交服务。
DispatcherRunner
DispatcherRunner顾名思义,应该就是要run dispatcher,不过run dispatcher的前提是先获得leadership,因此在构造DispatcherRunner后,要先进行leadership选举,只有当获得leadership的节点,才可以构造并启动dispatcher。
这里面主要用到的组件为DispatcherRunnerLeaderElectionLifecyleManager,实际用到的选举的服务为LeaderElectionService,根据HighAvailabilityService的不同,可以为StandaloneLeaderElectionService或者ZookeeperLeaderElectionService。
//在DispatcherRunnerLeaderElectionLifecycleManager的构造函数中,//关联leaderElectionService和dispatcherRunnerprivate DispatcherRunnerLeaderElectionLifecycleManager(T dispatcherRunner, LeaderElectionService leaderElectionService) throws Exception {this.dispatcherRunner = dispatcherRunner;this.leaderElectionService = leaderElectionService;leaderElectionService.start(dispatcherRunner);}public class StandaloneLeaderElectionService implements LeaderElectionService {private LeaderContender contender = null;@Overridepublic void start(LeaderContender newContender) throws Exception {if (contender != null) {// Service was already startedthrow new IllegalArgumentException("Leader election service cannot be started multiple times.");}contender = Preconditions.checkNotNull(newContender);// directly grant leadership to the given contender//此处的contender就是dispatcherRunnercontender.grantLeadership(HighAvailabilityServices.DEFAULT_LEADER_ID);}...public final class DefaultDispatcherRunner implements DispatcherRunner, LeaderContender {//问题1??既然已经把leader的选举委托给了DispatcherRunnerLeaderElectionLifecycleManager//为什么此处还需要握有leaderElectionService?private final LeaderElectionService leaderElectionService;// ---------------------------------------------------------------// Leader election// ---------------------------------------------------------------//当dispatcherRunner收到grantLeadership回调后,开始启动DispatcherLeaderProcess//问题2??为什么不直接启动Dispatcher@Overridepublic void grantLeadership(UUID leaderSessionID) {runActionIfRunning(() -> startNewDispatcherLeaderProcess(leaderSessionID));}
▽问题1:为什么dispatcherRunner还需要leaderElectionService?
private void forwardConfirmLeaderSessionFuture(UUID leaderSessionID, DispatcherLeaderProcess newDispatcherLeaderProcess) {FutureUtils.assertNoException(newDispatcherLeaderProcess//当获得了dispatcher的leaderAddress后.getLeaderAddressFuture().thenAccept(leaderAddress -> {if (leaderElectionService.hasLeadership(leaderSessionID)) {//调用leaderElectionService把当前的leaderAddress//写入到zookeeperleaderElectionService.confirmLeadership(leaderSessionID, leaderAddress);}}));}
▽问题2:为什么不直接构造并启动dispatcher,而是需要引入DispatcherLeaderProcess?
从上面的类图可以看到大部分的逻辑其实是在SessionDispatcherLeaderProcess里面,JobDispatcherLeaderProcess里面只实现了onStart方法。
再看基类AbstractDispatcherLeaderProcess,主要是实现了一些简单的状态流转以及回调子类的方法实现,因此可以抽象DispatcherLeaderProcess这一层主要是为了SessionDispatcherLeaderProcess。
接下来,我们来看一下JobDispatcherLeaderProcess实现的onStart方法,直接利用dispatcherGatewayServiceFactory构造了DispatcherGatewayService。
protected void onStart() {final DispatcherGatewayService dispatcherService =dispatcherGatewayServiceFactory.create(DispatcherId.fromUuid(getLeaderSessionId()),Collections.singleton(jobGraph),ThrowingJobGraphWriter.INSTANCE);completeDispatcherSetup(dispatcherService);}
public AbstractDispatcherLeaderProcess.DispatcherGatewayService create(DispatcherId fencingToken,Collection<JobGraph> recoveredJobs,JobGraphWriter jobGraphWriter) {final Dispatcher dispatcher;try {dispatcher =dispatcherFactory.createDispatcher(rpcService,fencingToken,recoveredJobs,(dispatcherGateway, scheduledExecutor, errorHandler) ->new NoOpDispatcherBootstrap(),PartialDispatcherServicesWithJobGraphStore.from(partialDispatcherServices, jobGraphWriter));} catch (Exception e) {throw new FlinkRuntimeException("Could not create the Dispatcher rpc endpoint.", e);}//JobCluster模式下,会构造MiniDispatcher,具体的实现见下面。dispatcher.start();return DefaultDispatcherGatewayService.from(dispatcher);}public enum JobDispatcherFactory implements DispatcherFactory {INSTANCE;@Overridepublic MiniDispatcher createDispatcher(RpcService rpcService,DispatcherId fencingToken,Collection<JobGraph> recoveredJobs,DispatcherBootstrapFactory dispatcherBootstrapFactory,PartialDispatcherServicesWithJobGraphStore partialDispatcherServicesWithJobGraphStore)throws Exception {final JobGraph jobGraph = Iterables.getOnlyElement(recoveredJobs);final Configuration configuration =partialDispatcherServicesWithJobGraphStore.getConfiguration();final String executionModeValue = configuration.getString(EXECUTION_MODE);final ClusterEntrypoint.ExecutionMode executionMode =ClusterEntrypoint.ExecutionMode.valueOf(executionModeValue);return new MiniDispatcher(rpcService,fencingToken,DispatcherServices.from(partialDispatcherServicesWithJobGraphStore,DefaultJobManagerRunnerFactory.INSTANCE),jobGraph,dispatcherBootstrapFactory,executionMode);}}
到这里Dispatcher就已经被构造并启动了,总结上面的过程,我把组件之间的调用明细整理了一张关系图如下,供大家在看代码时参考。
本文到这里就结束了,主要介绍了从AppMaster启动到Dispatcher启动的过程,接下来我们会介绍JobMaster从启动一直到作业运行的过程。
【本文作者Jacky即为知乎作者 a coder 】
【擎创科技整理发布,转载请注明来源及原创作者】
擎创夏洛克AIOps智慧运营平台,在数字运维中台上实现精准告警、异常检测、根因定位等智能场景,将人工智能赋能 运维管理 ,激活运维数据智慧,助力客户数字化转型。
产品已在交通银行、浦发银行、东方证券、华晨宝马、中石化上海石化等行业标杆企业落地,覆盖金融/制造/能源交通等多个行业。
