探究与解决YARN Container分配过于集中的问题
提出问题
分析问题
随NodeManager心跳分配
@Overridepublic void handle(SchedulerEvent event) {switch (event.getType()) {// ...case NODE_UPDATE:if (!(event instanceof NodeUpdateSchedulerEvent)) {throw new RuntimeException("Unexpected event type: " + event);}NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event;nodeUpdate(nodeUpdatedEvent.getRMNode());break;// ...}}
private synchronized void nodeUpdate(RMNode nm) {long start = getClock().getTime();if (LOG.isDebugEnabled()) {LOG.debug("nodeUpdate: " + nm + " cluster capacity: " + clusterResource);}eventLog.log("HEARTBEAT", nm.getHostName());FSSchedulerNode node = getFSSchedulerNode(nm.getNodeID());List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();for(UpdatedContainerInfo containerInfo : containerInfoList) {newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers());completedContainers.addAll(containerInfo.getCompletedContainers());}// Processing the newly launched containersfor (ContainerStatus launchedContainer : newlyLaunchedContainers) {containerLaunchedOnNode(launchedContainer.getContainerId(), node);}// Process completed containersfor (ContainerStatus completedContainer : completedContainers) {ContainerId containerId = completedContainer.getContainerId();LOG.debug("Container FINISHED: " + containerId);completedContainer(getRMContainer(containerId),completedContainer, RMContainerEventType.FINISHED);}if (continuousSchedulingEnabled) {if (!completedContainers.isEmpty()) {attemptScheduling(node);}} else {attemptScheduling(node);}long duration = getClock().getTime() - start;fsOpDurations.addNodeUpdateDuration(duration);}
后台线程分配
private class ContinuousSchedulingThread extends Thread {public void run() {while (!Thread.currentThread().isInterrupted()) {try {continuousSchedulingAttempt();Thread.sleep(getContinuousSchedulingSleepMs());} catch (InterruptedException e) {LOG.warn("Continuous scheduling thread interrupted. Exiting.", e);return;}}}}void continuousSchedulingAttempt() throws InterruptedException {long start = getClock().getTime();List<NodeId> nodeIdList = new ArrayList<NodeId>(nodes.keySet());// Sort the nodes by space available on them, so that we offer// containers on emptier nodes first, facilitating an even spread. This// requires holding the scheduler lock, so that the space available on a// node doesn't change during the sort.synchronized (this) {Collections.sort(nodeIdList, nodeAvailableResourceComparator);}// iterate all nodesfor (NodeId nodeId : nodeIdList) {FSSchedulerNode node = getFSSchedulerNode(nodeId);try {if (node != null && Resources.fitsIn(minimumAllocation,node.getAvailableResource())) {attemptScheduling(node);}} catch (Throwable ex) {LOG.error("Error while attempting scheduling for node " + node +": " + ex.toString(), ex);if ((ex instanceof YarnRuntimeException) &&(ex.getCause() instanceof InterruptedException)) {// AsyncDispatcher translates InterruptedException to// YarnRuntimeException with cause InterruptedException.// Need to throw InterruptedException to stop schedulingThread.throw (InterruptedException)ex.getCause();}}}long duration = getClock().getTime() - start;fsOpDurations.addContinuousSchedulingRunDuration(duration);}
调度分配Container
synchronized void attemptScheduling(FSSchedulerNode node) {if (rmContext.isWorkPreservingRecoveryEnabled()&& !rmContext.isSchedulerReadyForAllocatingContainers()) {return;}// Assign new containers...// 1. Check for reserved applications// 2. Schedule if there are no reservationsFSAppAttempt reservedAppSchedulable = node.getReservedAppSchedulable();if (reservedAppSchedulable != null) {Priority reservedPriority = node.getReservedContainer().getReservedPriority();if (!reservedAppSchedulable.hasContainerForNode(reservedPriority, node)) {// Don't hold the reservation if app can no longer use itLOG.info("Releasing reservation that cannot be satisfied for application "+ reservedAppSchedulable.getApplicationAttemptId()+ " on node " + node);reservedAppSchedulable.unreserve(reservedPriority, node);reservedAppSchedulable = null;} else {// Reservation exists; try to fulfill the reservationif (LOG.isDebugEnabled()) {LOG.debug("Trying to fulfill reservation for application "+ reservedAppSchedulable.getApplicationAttemptId()+ " on node: " + node);}node.getReservedAppSchedulable().assignReservedContainer(node);}}if (reservedAppSchedulable == null) {// No reservation, schedule at queue which is farthest below fair shareint assignedContainers = 0;while (node.getReservedContainer() == null) {boolean assignedContainer = false;if (!queueMgr.getRootQueue().assignContainer(node).equals(Resources.none())) {assignedContainers++;assignedContainer = true;}if (!assignedContainer) { break; }if (!assignMultiple) { break; }if ((assignedContainers >= maxAssign) && (maxAssign > 0)) { break; }}}updateRootQueueMetrics();}
该方法首先检查当前是否有为本Application预留的Container资源,
如果有且在当前节点上可调度,就直接分配之。如果没有预留资源,
就尝试在YARN队列中分配Container。注意最后的那个while循环,
可以得知:
如果assignMultiple(对应yarn.scheduler.fair.assignmultiple参数)为true,那么在成功分配一个Container后不会停止,继续尝试在当前节点上分配;
在上一条的条件下,最多会连续分配maxAssign(对应yarn.scheduler.fair.max.assign参数)个Container后停止。
