擎创技术流 | Flink源码分析-JobDispatcher
背景介绍
最近一直在阅读Flink基于Yarn的资源管理相关的代码,牵扯的流程比较长,主要包含以下几个环节:
客户端环节:命令参数解析,定位到作业入口,生成JobGraph,翻译成启动对应的Yarn集群描述符,开始提交到Yarn。
AppMaster环节:根据不同的作业模式(Session/Job),选择不同的启动入口,开始启动集群,这个阶段需要构造的重要组件包含:Dispatcher,Resource Manager,Job Master。具体每个组件的作用 以及 每个组件的启动细节我们后面会进行分析。
Executor环节:作业启动后,根据需要的资源,通过Resource Manager调用Yarn客户端启动Task Executor,其中Task Executor的注册细节前文中已经详细介绍过。
其中第二个环节因为需要适配不同的资源管理框架,不同的作业模式以及不同的高可用实现方式,所以涉及到了非常多的组件,代码的结构也比较复杂,因此接下来我们针对第二个环节进行仔细的分析,本文主要来解构Dispatcher,为了做到完全的资源隔离,我日常中用的几乎全都是Job Cluster模式,因此我们就以该模式下的Dispatcher做为分析目标。
这一次,我不打算采取"先跳到代码去进行分析,然后总结的方法"。主要是因为代码的调用关系比较复杂,直接去做调用链分析的话,容易被绕晕,而且最终也就只能只见树木不见森林。我省略了一些不太重要的枝节,把Yarn AppMaster启动到Dispatcher启动这个过程总结了一张图:
基础服务启动
构造ActorSystem,监听JobManager所设定的ip以及端口;构造HighAvailability服务;构造Heartbeat服务;启动JMX;启动Blob服务。
ResourceManager构造和启动
通过YarnResourceManagerDriver实现了针对Yarn资源的申请和释放等资源操作。
JobDispatcherRunner构造、Leader选举 以及 构造启动Dispatcher
只有当获得Leader角色的节点,才会去触发构造启动Dispatcher。JobCluster与Session模式所实现的功能不同,Session模式下需要去监听JobGraph的变化,进而去创建或者停止Job,JobCluster模式下则不需要,因此在DispatcherRunner以及Dispatcher中间加了DispatcherLeaderProcess来针对Job与Session模式做了不同实现。
基础服务以及 Resource Manager这两个组件是比较容易理解的,其功能以及实现方法也都比较直接,大家可以直接阅读YarnJobClusterEntrypoint,YarnResourceManagerDriver,ActiveResoureManager这几个文件就可以了解其调用关系,本文不再额外做介绍。Dispatcher这个部份牵扯到了DispatcherRunner,DispatcherLeaderProcess以及Dispatcher以及每个组件分别对应的Factory(甚至FactoryFactory),实现也比较绕,不太容易去揣测这样设计的目的,也是我们接下来需要去重点分析的部份。
首先我们先来看看Dispatcher这个组件的功能,网上没有找到比较合适的针对Dispatcher的设计文档,最接近的是FLIP-6 ,但是对照代码来看,这个文档也已经过期了,与代码并不一致,因此本文中的推断都是来自于代码,Dispatcher对外提供的服务接口定义在DispatcherGateway中。
DispatcherGateway
/** Gateway for the Dispatcher component. */
public interface DispatcherGateway extends FencedRpcGateway<DispatcherId>, RestfulGateway {
/**
* Submit a job to the dispatcher.
*
* @param jobGraph JobGraph to submit
* @param timeout RPC timeout
* @return A future acknowledge if the submission succeeded
*/
CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, @RpcTimeout Time timeout);
/**
* List the current set of submitted jobs.
*
* @param timeout RPC timeout
* @return A future collection of currently submitted jobs
*/
CompletableFuture<Collection<JobID>> listJobs(@RpcTimeout Time timeout);
/**
* Returns the port of the blob server.
*
* @param timeout of the operation
* @return A future integer of the blob server port
*/
CompletableFuture<Integer> getBlobServerPort(@RpcTimeout Time timeout);
/**
* Requests the {@link ArchivedExecutionGraph} for the given jobId. If there is no such graph,
* then the future is completed with a {@link FlinkJobNotFoundException}.
*
* <p>Note: We enforce that the returned future contains a {@link ArchivedExecutionGraph} unlike
* the super interface.
*
* @param jobId identifying the job whose AccessExecutionGraph is requested
* @param timeout for the asynchronous operation
* @return Future containing the AccessExecutionGraph for the given jobId, otherwise {@link
* FlinkJobNotFoundException}
*/
@Override
CompletableFuture<ArchivedExecutionGraph> requestJob(JobID jobId, @RpcTimeout Time timeout);
default CompletableFuture<Acknowledge> shutDownCluster(ApplicationStatus applicationStatus) {
return shutDownCluster();
}
}
从接口定义来看,Dispatcher主要负责提交Job,查询Job列表以及停止集群,也就是说如果建立了到Dispatcher的连接,就可以利用Dispatcher去提交Job,可以基于Dispatcher构建一个对外的HTTP服务,然后对外提供作业的提交服务。
DispatcherRunner
DispatcherRunner顾名思义,应该就是要run dispatcher,不过run dispatcher的前提是先获得leadership,因此在构造DispatcherRunner后,要先进行leadership选举,只有当获得leadership的节点,才可以构造并启动dispatcher。
这里面主要用到的组件为DispatcherRunnerLeaderElectionLifecyleManager,实际用到的选举的服务为LeaderElectionService,根据HighAvailabilityService的不同,可以为StandaloneLeaderElectionService或者ZookeeperLeaderElectionService。
//在DispatcherRunnerLeaderElectionLifecycleManager的构造函数中,
//关联leaderElectionService和dispatcherRunner
private DispatcherRunnerLeaderElectionLifecycleManager(
T dispatcherRunner, LeaderElectionService leaderElectionService) throws Exception {
this.dispatcherRunner = dispatcherRunner;
this.leaderElectionService = leaderElectionService;
leaderElectionService.start(dispatcherRunner);
}
public class StandaloneLeaderElectionService implements LeaderElectionService {
private LeaderContender contender = null;
@Override
public void start(LeaderContender newContender) throws Exception {
if (contender != null) {
// Service was already started
throw new IllegalArgumentException(
"Leader election service cannot be started multiple times.");
}
contender = Preconditions.checkNotNull(newContender);
// directly grant leadership to the given contender
//此处的contender就是dispatcherRunner
contender.grantLeadership(HighAvailabilityServices.DEFAULT_LEADER_ID);
}
...
public final class DefaultDispatcherRunner implements DispatcherRunner, LeaderContender {
//问题1??既然已经把leader的选举委托给了DispatcherRunnerLeaderElectionLifecycleManager
//为什么此处还需要握有leaderElectionService?
private final LeaderElectionService leaderElectionService;
// ---------------------------------------------------------------
// Leader election
// ---------------------------------------------------------------
//当dispatcherRunner收到grantLeadership回调后,开始启动DispatcherLeaderProcess
//问题2??为什么不直接启动Dispatcher
@Override
public void grantLeadership(UUID leaderSessionID) {
runActionIfRunning(() -> startNewDispatcherLeaderProcess(leaderSessionID));
}
▽问题1:为什么dispatcherRunner还需要leaderElectionService?
private void forwardConfirmLeaderSessionFuture(
UUID leaderSessionID, DispatcherLeaderProcess newDispatcherLeaderProcess) {
FutureUtils.assertNoException(
newDispatcherLeaderProcess
//当获得了dispatcher的leaderAddress后
.getLeaderAddressFuture()
.thenAccept(
leaderAddress -> {
if (leaderElectionService.hasLeadership(leaderSessionID)) {
//调用leaderElectionService把当前的leaderAddress
//写入到zookeeper
leaderElectionService.confirmLeadership(
leaderSessionID, leaderAddress);
}
}));
}
▽问题2:为什么不直接构造并启动dispatcher,而是需要引入DispatcherLeaderProcess?
从上面的类图可以看到大部分的逻辑其实是在SessionDispatcherLeaderProcess里面,JobDispatcherLeaderProcess里面只实现了onStart方法。
再看基类AbstractDispatcherLeaderProcess,主要是实现了一些简单的状态流转以及回调子类的方法实现,因此可以抽象DispatcherLeaderProcess这一层主要是为了SessionDispatcherLeaderProcess。
接下来,我们来看一下JobDispatcherLeaderProcess实现的onStart方法,直接利用dispatcherGatewayServiceFactory构造了DispatcherGatewayService。
protected void onStart() {
final DispatcherGatewayService dispatcherService =
dispatcherGatewayServiceFactory.create(
DispatcherId.fromUuid(getLeaderSessionId()),
Collections.singleton(jobGraph),
ThrowingJobGraphWriter.INSTANCE);
completeDispatcherSetup(dispatcherService);
}
public AbstractDispatcherLeaderProcess.DispatcherGatewayService create(
DispatcherId fencingToken,
Collection<JobGraph> recoveredJobs,
JobGraphWriter jobGraphWriter) {
final Dispatcher dispatcher;
try {
dispatcher =
dispatcherFactory.createDispatcher(
rpcService,
fencingToken,
recoveredJobs,
(dispatcherGateway, scheduledExecutor, errorHandler) ->
new NoOpDispatcherBootstrap(),
PartialDispatcherServicesWithJobGraphStore.from(
partialDispatcherServices, jobGraphWriter));
} catch (Exception e) {
throw new FlinkRuntimeException("Could not create the Dispatcher rpc endpoint.", e);
}
//JobCluster模式下,会构造MiniDispatcher,具体的实现见下面。
dispatcher.start();
return DefaultDispatcherGatewayService.from(dispatcher);
}
public enum JobDispatcherFactory implements DispatcherFactory {
INSTANCE;
@Override
public MiniDispatcher createDispatcher(
RpcService rpcService,
DispatcherId fencingToken,
Collection<JobGraph> recoveredJobs,
DispatcherBootstrapFactory dispatcherBootstrapFactory,
PartialDispatcherServicesWithJobGraphStore partialDispatcherServicesWithJobGraphStore)
throws Exception {
final JobGraph jobGraph = Iterables.getOnlyElement(recoveredJobs);
final Configuration configuration =
partialDispatcherServicesWithJobGraphStore.getConfiguration();
final String executionModeValue = configuration.getString(EXECUTION_MODE);
final ClusterEntrypoint.ExecutionMode executionMode =
ClusterEntrypoint.ExecutionMode.valueOf(executionModeValue);
return new MiniDispatcher(
rpcService,
fencingToken,
DispatcherServices.from(
partialDispatcherServicesWithJobGraphStore,
DefaultJobManagerRunnerFactory.INSTANCE),
jobGraph,
dispatcherBootstrapFactory,
executionMode);
}
}
到这里Dispatcher就已经被构造并启动了,总结上面的过程,我把组件之间的调用明细整理了一张关系图如下,供大家在看代码时参考。
本文到这里就结束了,主要介绍了从AppMaster启动到Dispatcher启动的过程,接下来我们会介绍JobMaster从启动一直到作业运行的过程。
【本文作者Jacky即为知乎作者 a coder 】
【擎创科技整理发布,转载请注明来源及原创作者】
擎创夏洛克AIOps智慧运营平台,在数字运维中台上实现精准告警、异常检测、根因定位等智能场景,将人工智能赋能 运维管理 ,激活运维数据智慧,助力客户数字化转型。
产品已在交通银行、浦发银行、东方证券、华晨宝马、中石化上海石化等行业标杆企业落地,覆盖金融/制造/能源交通等多个行业。