vlambda博客
学习文章列表

擎创技术流 | Flink源码分析-JobDispatcher

背景介绍

最近一直在阅读Flink基于Yarn的资源管理相关的代码,牵扯的流程比较长,主要包含以下几个环节:

  • 客户端环节:命令参数解析,定位到作业入口,生成JobGraph,翻译成启动对应的Yarn集群描述符,开始提交到Yarn。

  • AppMaster环节:根据不同的作业模式(Session/Job),选择不同的启动入口,开始启动集群,这个阶段需要构造的重要组件包含:Dispatcher,Resource Manager,Job Master。具体每个组件的作用 以及 每个组件的启动细节我们后面会进行分析。

  • Executor环节:作业启动后,根据需要的资源,通过Resource Manager调用Yarn客户端启动Task Executor,其中Task Executor的注册细节前文中已经详细介绍过。


其中第二个环节因为需要适配不同的资源管理框架,不同的作业模式以及不同的高可用实现方式,所以涉及到了非常多的组件,代码的结构也比较复杂,因此接下来我们针对第二个环节进行仔细的分析,本文主要来解构Dispatcher,为了做到完全的资源隔离,我日常中用的几乎全都是Job Cluster模式,因此我们就以该模式下的Dispatcher做为分析目标。

这一次,我不打算采取"先跳到代码去进行分析,然后总结的方法"。主要是因为代码的调用关系比较复杂,直接去做调用链分析的话,容易被绕晕,而且最终也就只能只见树木不见森林。我省略了一些不太重要的枝节,把Yarn AppMaster启动到Dispatcher启动这个过程总结了一张图:

擎创技术流 | Flink源码分析-JobDispatcher

基础服务启动

构造ActorSystem,监听JobManager所设定的ip以及端口;构造HighAvailability服务;构造Heartbeat服务;启动JMX;启动Blob服务。

ResourceManager构造和启动

通过YarnResourceManagerDriver实现了针对Yarn资源的申请和释放等资源操作。

JobDispatcherRunner构造、Leader选举 以及 构造启动Dispatcher

只有当获得Leader角色的节点,才会去触发构造启动Dispatcher。JobCluster与Session模式所实现的功能不同,Session模式下需要去监听JobGraph的变化,进而去创建或者停止Job,JobCluster模式下则不需要,因此在DispatcherRunner以及Dispatcher中间加了DispatcherLeaderProcess来针对Job与Session模式做了不同实现。

基础服务以及 Resource Manager这两个组件是比较容易理解的,其功能以及实现方法也都比较直接,大家可以直接阅读YarnJobClusterEntrypoint,YarnResourceManagerDriver,ActiveResoureManager这几个文件就可以了解其调用关系,本文不再额外做介绍。Dispatcher这个部份牵扯到了DispatcherRunner,DispatcherLeaderProcess以及Dispatcher以及每个组件分别对应的Factory(甚至FactoryFactory),实现也比较绕,不太容易去揣测这样设计的目的,也是我们接下来需要去重点分析的部份。

首先我们先来看看Dispatcher这个组件的功能,网上没有找到比较合适的针对Dispatcher的设计文档,最接近的是FLIP-6 ,但是对照代码来看,这个文档也已经过期了,与代码并不一致,因此本文中的推断都是来自于代码,Dispatcher对外提供的服务接口定义在DispatcherGateway中。

DispatcherGateway

/** Gateway for the Dispatcher component. */public interface DispatcherGateway extends FencedRpcGateway<DispatcherId>, RestfulGateway {
   /**     * Submit a job to the dispatcher.     *     * @param jobGraph JobGraph to submit     * @param timeout RPC timeout     * @return A future acknowledge if the submission succeeded     */    CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, @RpcTimeout Time timeout);
   /**     * List the current set of submitted jobs.     *     * @param timeout RPC timeout     * @return A future collection of currently submitted jobs     */    CompletableFuture<Collection<JobID>> listJobs(@RpcTimeout Time timeout);
   /**     * Returns the port of the blob server.     *     * @param timeout of the operation     * @return A future integer of the blob server port     */    CompletableFuture<Integer> getBlobServerPort(@RpcTimeout Time timeout);
   /**     * Requests the {@link ArchivedExecutionGraph} for the given jobId. If there is no such graph,     * then the future is completed with a {@link FlinkJobNotFoundException}.     *     * <p>Note: We enforce that the returned future contains a {@link ArchivedExecutionGraph} unlike     * the super interface.     *     * @param jobId identifying the job whose AccessExecutionGraph is requested     * @param timeout for the asynchronous operation     * @return Future containing the AccessExecutionGraph for the given jobId, otherwise {@link     *     FlinkJobNotFoundException}     */    @Override    CompletableFuture<ArchivedExecutionGraph> requestJob(JobID jobId, @RpcTimeout Time timeout);
   default CompletableFuture<Acknowledge> shutDownCluster(ApplicationStatus applicationStatus) {        return shutDownCluster();    }}

从接口定义来看,Dispatcher主要负责提交Job,查询Job列表以及停止集群,也就是说如果建立了到Dispatcher的连接,就可以利用Dispatcher去提交Job,可以基于Dispatcher构建一个对外的HTTP服务,然后对外提供作业的提交服务。

DispatcherRunner

DispatcherRunner顾名思义,应该就是要run dispatcher,不过run dispatcher的前提是先获得leadership,因此在构造DispatcherRunner后,要先进行leadership选举,只有当获得leadership的节点,才可以构造并启动dispatcher。

这里面主要用到的组件为DispatcherRunnerLeaderElectionLifecyleManager,实际用到的选举的服务为LeaderElectionService,根据HighAvailabilityService的不同,可以为StandaloneLeaderElectionService或者ZookeeperLeaderElectionService。

//在DispatcherRunnerLeaderElectionLifecycleManager的构造函数中,//关联leaderElectionService和dispatcherRunnerprivate DispatcherRunnerLeaderElectionLifecycleManager(            T dispatcherRunner, LeaderElectionService leaderElectionService) throws Exception {        this.dispatcherRunner = dispatcherRunner;        this.leaderElectionService = leaderElectionService;
       leaderElectionService.start(dispatcherRunner);    }

public class StandaloneLeaderElectionService implements LeaderElectionService {    private LeaderContender contender = null;    @Override    public void start(LeaderContender newContender) throws Exception {        if (contender != null) {            // Service was already started            throw new IllegalArgumentException(                    "Leader election service cannot be started multiple times.");        }
       contender = Preconditions.checkNotNull(newContender);
       // directly grant leadership to the given contender        //此处的contender就是dispatcherRunner        contender.grantLeadership(HighAvailabilityServices.DEFAULT_LEADER_ID);    }...


public final class DefaultDispatcherRunner implements DispatcherRunner, LeaderContender {        //问题1??既然已经把leader的选举委托给了DispatcherRunnerLeaderElectionLifecycleManager    //为什么此处还需要握有leaderElectionService?    private final LeaderElectionService leaderElectionService;
   // ---------------------------------------------------------------    // Leader election    // ---------------------------------------------------------------
   //当dispatcherRunner收到grantLeadership回调后,开始启动DispatcherLeaderProcess    //问题2??为什么不直接启动Dispatcher    @Override    public void grantLeadership(UUID leaderSessionID) {        runActionIfRunning(() -> startNewDispatcherLeaderProcess(leaderSessionID));    }

▽问题1:为什么dispatcherRunner还需要leaderElectionService?

private void forwardConfirmLeaderSessionFuture(            UUID leaderSessionID, DispatcherLeaderProcess newDispatcherLeaderProcess) {        FutureUtils.assertNoException(                newDispatcherLeaderProcess                        //当获得了dispatcher的leaderAddress后                        .getLeaderAddressFuture()                        .thenAccept(                                leaderAddress -> {                                    if (leaderElectionService.hasLeadership(leaderSessionID)) {                                                //调用leaderElectionService把当前的leaderAddress                                                //写入到zookeeper                                                leaderElectionService.confirmLeadership(                                                leaderSessionID, leaderAddress);                                    }                                }));    }

▽问题2:为什么不直接构造并启动dispatcher,而是需要引入DispatcherLeaderProcess?

擎创技术流 | Flink源码分析-JobDispatcher

从上面的类图可以看到大部分的逻辑其实是在SessionDispatcherLeaderProcess里面,JobDispatcherLeaderProcess里面只实现了onStart方法。

再看基类AbstractDispatcherLeaderProcess,主要是实现了一些简单的状态流转以及回调子类的方法实现,因此可以抽象DispatcherLeaderProcess这一层主要是为了SessionDispatcherLeaderProcess。

接下来,我们来看一下JobDispatcherLeaderProcess实现的onStart方法,直接利用dispatcherGatewayServiceFactory构造了DispatcherGatewayService。

 protected void onStart() {        final DispatcherGatewayService dispatcherService =                dispatcherGatewayServiceFactory.create(                        DispatcherId.fromUuid(getLeaderSessionId()),                        Collections.singleton(jobGraph),                        ThrowingJobGraphWriter.INSTANCE);
       completeDispatcherSetup(dispatcherService);    }
 public AbstractDispatcherLeaderProcess.DispatcherGatewayService create(            DispatcherId fencingToken,            Collection<JobGraph> recoveredJobs,            JobGraphWriter jobGraphWriter) {
       final Dispatcher dispatcher;        try {            dispatcher =                    dispatcherFactory.createDispatcher(                            rpcService,                            fencingToken,                            recoveredJobs,                            (dispatcherGateway, scheduledExecutor, errorHandler) ->                                    new NoOpDispatcherBootstrap(),                            PartialDispatcherServicesWithJobGraphStore.from(                                    partialDispatcherServices, jobGraphWriter));        } catch (Exception e) {            throw new FlinkRuntimeException("Could not create the Dispatcher rpc endpoint.", e);        }        //JobCluster模式下,会构造MiniDispatcher,具体的实现见下面。        dispatcher.start();
       return DefaultDispatcherGatewayService.from(dispatcher);    }

public enum JobDispatcherFactory implements DispatcherFactory {    INSTANCE;
   @Override    public MiniDispatcher createDispatcher(            RpcService rpcService,            DispatcherId fencingToken,            Collection<JobGraph> recoveredJobs,            DispatcherBootstrapFactory dispatcherBootstrapFactory,            PartialDispatcherServicesWithJobGraphStore partialDispatcherServicesWithJobGraphStore)            throws Exception {        final JobGraph jobGraph = Iterables.getOnlyElement(recoveredJobs);
       final Configuration configuration =                partialDispatcherServicesWithJobGraphStore.getConfiguration();        final String executionModeValue = configuration.getString(EXECUTION_MODE);        final ClusterEntrypoint.ExecutionMode executionMode =                ClusterEntrypoint.ExecutionMode.valueOf(executionModeValue);
       return new MiniDispatcher(                rpcService,                fencingToken,                DispatcherServices.from(                        partialDispatcherServicesWithJobGraphStore,                        DefaultJobManagerRunnerFactory.INSTANCE),                jobGraph,                dispatcherBootstrapFactory,                executionMode);    }}

到这里Dispatcher就已经被构造并启动了,总结上面的过程,我把组件之间的调用明细整理了一张关系图如下,供大家在看代码时参考。

擎创技术流 | Flink源码分析-JobDispatcher

本文到这里就结束了,主要介绍了从AppMaster启动到Dispatcher启动的过程,接下来我们会介绍JobMaster从启动一直到作业运行的过程。

【本文作者Jacky即为知乎作者 a coder 

【擎创科技整理发布,转载请注明来源及原创作者】



擎创科技,成立于2016年,国内首家智能运维产品提供商,四度被Gartner提名为AIOps领域重点服务商。
擎创夏洛克
AIOps智慧运营平台,在数字运维中台上实现精准告警、异常检测、根因定位等智能场景,将人工智能赋能
运维管理 ,激活运维数据智慧,助力客户数字化转型。
产品已在交通银行、浦发银行、东方证券、华晨宝马、中石化上海石化等行业标杆企业落地,覆盖金融/制造/能源交通等多个行业。